YARN-4994. Use MiniYARNCluster with try-with-resources in tests. Contributed by Andras Bokor.

(cherry picked from commit ae401539ea)
This commit is contained in:
Akira Ajisaka 2016-12-22 14:32:24 +09:00
parent 16c4dcad85
commit 522bc98026
7 changed files with 85 additions and 137 deletions

View File

@ -559,11 +559,9 @@ public void testTimelineEventHandling() throws Exception {
TestParams t = new TestParams(false); TestParams t = new TestParams(false);
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
MiniYARNCluster yarnCluster = null;
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
try { try (MiniYARNCluster yarnCluster = new MiniYARNCluster(
yarnCluster = new MiniYARNCluster( TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1)) {
TestJobHistoryEventHandler.class.getSimpleName(), 1, 1, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
Configuration confJHEH = new YarnConfiguration(conf); Configuration confJHEH = new YarnConfiguration(conf);
@ -716,10 +714,6 @@ public void testTimelineEventHandling() throws Exception {
tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE")); tEntity.getEvents().get(0).getEventInfo().get("TASK_TYPE"));
Assert.assertEquals(TaskType.MAP.toString(), Assert.assertEquals(TaskType.MAP.toString(),
tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE")); tEntity.getEvents().get(1).getEventInfo().get("TASK_TYPE"));
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
}
} }
} }

View File

@ -165,13 +165,11 @@ public void testCheckMaxEligible() throws Exception {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testFilterAppsByAggregatedStatus() throws Exception { public void testFilterAppsByAggregatedStatus() throws Exception {
MiniYARNCluster yarnCluster = null; try (MiniYARNCluster yarnCluster =
try { new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(),
1, 1, 1, 1)) {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
1, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
conf = yarnCluster.getConfig(); conf = yarnCluster.getConfig();
@ -237,10 +235,6 @@ public void testFilterAppsByAggregatedStatus() throws Exception {
Assert.assertTrue(hal.eligibleApplications.contains(app4)); Assert.assertTrue(hal.eligibleApplications.contains(app4));
Assert.assertTrue(hal.eligibleApplications.contains(app7)); Assert.assertTrue(hal.eligibleApplications.contains(app7));
Assert.assertTrue(hal.eligibleApplications.contains(app8)); Assert.assertTrue(hal.eligibleApplications.contains(app8));
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
}
} }
} }

View File

@ -52,16 +52,14 @@ public class TestHadoopArchiveLogsRunner {
@Test(timeout = 50000) @Test(timeout = 50000)
public void testHadoopArchiveLogs() throws Exception { public void testHadoopArchiveLogs() throws Exception {
MiniYARNCluster yarnCluster = null;
MiniDFSCluster dfsCluster = null; MiniDFSCluster dfsCluster = null;
FileSystem fs = null; FileSystem fs = null;
try { try (MiniYARNCluster yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
1, 2, 1, 1)) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
1, 2, 1, 1);
yarnCluster.init(conf); yarnCluster.init(conf);
yarnCluster.start(); yarnCluster.start();
conf = yarnCluster.getConfig(); conf = yarnCluster.getConfig();
@ -133,9 +131,6 @@ public int compare(FileStatus o1, FileStatus o2) {
harLogs[2].getOwner()); harLogs[2].getOwner());
Assert.assertEquals(0, fs.listStatus(workingDir).length); Assert.assertEquals(0, fs.listStatus(workingDir).length);
} finally { } finally {
if (yarnCluster != null) {
yarnCluster.stop();
}
if (fs != null) { if (fs != null) {
fs.close(); fs.close();
} }

View File

@ -35,8 +35,6 @@ public class TestHedgingRequestRMFailoverProxyProvider {
@Test @Test
public void testHedgingRequestProxyProvider() throws Exception { public void testHedgingRequestProxyProvider() throws Exception {
final MiniYARNCluster cluster =
new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1);
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
@ -49,41 +47,44 @@ public void testHedgingRequestProxyProvider() throws Exception {
conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
2000); 2000);
HATestUtil.setRpcAddressForRM("rm1", 10000, conf); try (MiniYARNCluster cluster =
HATestUtil.setRpcAddressForRM("rm2", 20000, conf); new MiniYARNCluster("testHedgingRequestProxyProvider", 5, 0, 1, 1)) {
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
cluster.init(conf); HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
cluster.start(); HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
HATestUtil.setRpcAddressForRM("rm4", 40000, conf);
HATestUtil.setRpcAddressForRM("rm5", 50000, conf);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
final YarnClient client = YarnClient.createYarnClient(); cluster.init(conf);
client.init(conf); cluster.start();
client.start();
// Transition rm5 to active; final YarnClient client = YarnClient.createYarnClient();
long start = System.currentTimeMillis(); client.init(conf);
makeRMActive(cluster, 4); client.start();
validateActiveRM(client); // Transition rm5 to active;
long start = System.currentTimeMillis();
makeRMActive(cluster, 4);
long end = System.currentTimeMillis(); validateActiveRM(client);
System.out.println("Client call succeeded at " + end);
// should return the response fast
Assert.assertTrue(end - start <= 10000);
// transition rm5 to standby long end = System.currentTimeMillis();
cluster.getResourceManager(4).getRMContext().getRMAdminService() System.out.println("Client call succeeded at " + end);
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo( // should return the response fast
HAServiceProtocol.RequestSource.REQUEST_BY_USER)); Assert.assertTrue(end - start <= 10000);
makeRMActive(cluster, 2); // transition rm5 to standby
cluster.getResourceManager(4).getRMContext().getRMAdminService()
.transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER));
validateActiveRM(client); makeRMActive(cluster, 2);
cluster.stop(); validateActiveRM(client);
}
} }
private void validateActiveRM(YarnClient client) throws IOException { private void validateActiveRM(YarnClient client) throws IOException {

View File

@ -60,11 +60,11 @@ public class TestAMRMProxy extends BaseAMRMProxyE2ETest {
*/ */
@Test(timeout = 120000) @Test(timeout = 120000)
public void testAMRMProxyE2E() throws Exception { public void testAMRMProxyE2E() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E", 1, 1, 1);
YarnClient rmClient = null;
ApplicationMasterProtocol client; ApplicationMasterProtocol client;
try { try (MiniYARNCluster cluster = new MiniYARNCluster("testAMRMProxyE2E",
1, 1, 1);
YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
cluster.init(conf); cluster.init(conf);
@ -75,7 +75,6 @@ public void testAMRMProxyE2E() throws Exception {
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf); rmClient.init(yarnConf);
rmClient.start(); rmClient.start();
@ -135,11 +134,6 @@ public void testAMRMProxyE2E() throws Exception {
Thread.sleep(500); Thread.sleep(500);
Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState()); Assert.assertNotEquals(RMAppState.FINISHED, rmApp.getState());
} finally {
if (rmClient != null) {
rmClient.stop();
}
cluster.stop();
} }
} }
@ -150,12 +144,11 @@ public void testAMRMProxyE2E() throws Exception {
*/ */
@Test(timeout = 120000) @Test(timeout = 120000)
public void testE2ETokenRenewal() throws Exception { public void testE2ETokenRenewal() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
YarnClient rmClient = null;
ApplicationMasterProtocol client; ApplicationMasterProtocol client;
try { try (MiniYARNCluster cluster =
new MiniYARNCluster("testE2ETokenRenewal", 1, 1, 1);
YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500); conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1500);
@ -170,7 +163,6 @@ public void testE2ETokenRenewal() throws Exception {
final Configuration yarnConf = cluster.getConfig(); final Configuration yarnConf = cluster.getConfig();
yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, yarnConf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS); YarnConfiguration.DEFAULT_AMRM_PROXY_ADDRESS);
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf); rmClient.init(yarnConf);
rmClient.start(); rmClient.start();
@ -216,11 +208,6 @@ public void testE2ETokenRenewal() throws Exception {
client.finishApplicationMaster(FinishApplicationMasterRequest client.finishApplicationMaster(FinishApplicationMasterRequest
.newInstance(FinalApplicationStatus.SUCCEEDED, "success", null)); .newInstance(FinalApplicationStatus.SUCCEEDED, "success", null));
} finally {
if (rmClient != null) {
rmClient.stop();
}
cluster.stop();
} }
} }
@ -230,11 +217,11 @@ public void testE2ETokenRenewal() throws Exception {
*/ */
@Test(timeout = 120000) @Test(timeout = 120000)
public void testE2ETokenSwap() throws Exception { public void testE2ETokenSwap() throws Exception {
MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap", 1, 1, 1);
YarnClient rmClient = null;
ApplicationMasterProtocol client; ApplicationMasterProtocol client;
try { try (MiniYARNCluster cluster = new MiniYARNCluster("testE2ETokenSwap",
1, 1, 1);
YarnClient rmClient = YarnClient.createYarnClient()) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true); conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
cluster.init(conf); cluster.init(conf);
@ -242,7 +229,6 @@ public void testE2ETokenSwap() throws Exception {
// the client will connect to the RM with the token provided by AMRMProxy // the client will connect to the RM with the token provided by AMRMProxy
final Configuration yarnConf = cluster.getConfig(); final Configuration yarnConf = cluster.getConfig();
rmClient = YarnClient.createYarnClient();
rmClient.init(yarnConf); rmClient.init(yarnConf);
rmClient.start(); rmClient.start();
@ -260,11 +246,6 @@ public void testE2ETokenSwap() throws Exception {
e.getMessage().startsWith("Invalid AMRMToken from appattempt_")); e.getMessage().startsWith("Invalid AMRMToken from appattempt_"));
} }
} finally {
if (rmClient != null) {
rmClient.stop();
}
cluster.stop();
} }
} }
} }

View File

@ -1721,15 +1721,13 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
+ "ProportionalCapacityPreemptionPolicy"); + "ProportionalCapacityPreemptionPolicy");
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
conf.setBoolean(PREFIX + "root.a.a1.disable_preemption", true); conf.setBoolean(PREFIX + "root.a.a1.disable_preemption", true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient yarnClient = null; try (MiniYARNCluster cluster =
try { new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
YarnClient yarnClient = YarnClient.createYarnClient()) {
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
final Configuration yarnConf = cluster.getConfig(); final Configuration yarnConf = cluster.getConfig();
yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf); yarnClient.init(yarnConf);
yarnClient.start(); yarnClient.start();
@ -1742,13 +1740,6 @@ public void testGetQueueInfoPreemptionDisabled() throws Exception {
assertEquals(0, result); assertEquals(0, result);
Assert.assertTrue(sysOutStream.toString() Assert.assertTrue(sysOutStream.toString()
.contains("Preemption : disabled")); .contains("Preemption : disabled"));
} finally {
// clean-up
if (yarnClient != null) {
yarnClient.stop();
}
cluster.stop();
cluster.close();
} }
} }

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil; import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
public class TestMiniYarnCluster { public class TestMiniYarnCluster {
@ -41,10 +42,11 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
*/ */
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
enableAHS = false; enableAHS = false;
MiniYARNCluster cluster = null; try (MiniYARNCluster cluster =
try { new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(), numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS); enableAHS)) {
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
@ -52,11 +54,6 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
Assert.assertNull("Timeline Service should not have been started", Assert.assertNull("Timeline Service should not have been started",
cluster.getApplicationHistoryServer()); cluster.getApplicationHistoryServer());
} }
finally {
if(cluster != null) {
cluster.stop();
}
}
/* /*
* Timeline service should start if TIMELINE_SERVICE_ENABLED == true * Timeline service should start if TIMELINE_SERVICE_ENABLED == true
@ -64,10 +61,10 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
*/ */
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
enableAHS = false; enableAHS = false;
cluster = null; try (MiniYARNCluster cluster =
try { new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(), numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS); enableAHS)) {
cluster.init(conf); cluster.init(conf);
// Verify that the timeline-service starts on ephemeral ports by default // Verify that the timeline-service starts on ephemeral ports by default
@ -89,21 +86,16 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
Assert.assertNotNull("Timeline Service should have been started", Assert.assertNotNull("Timeline Service should have been started",
cluster.getApplicationHistoryServer()); cluster.getApplicationHistoryServer());
} }
finally {
if(cluster != null) {
cluster.stop();
}
}
/* /*
* Timeline service should start if TIMELINE_SERVICE_ENABLED == false * Timeline service should start if TIMELINE_SERVICE_ENABLED == false
* and enableAHS == true * and enableAHS == true
*/ */
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false); conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false);
enableAHS = true; enableAHS = true;
cluster = null; try (MiniYARNCluster cluster =
try { new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(),
cluster = new MiniYARNCluster(TestMiniYarnCluster.class.getSimpleName(), numNodeManagers, numLocalDirs, numLogDirs, numLogDirs,
numNodeManagers, numLocalDirs, numLogDirs, numLogDirs, enableAHS); enableAHS)) {
cluster.init(conf); cluster.init(conf);
cluster.start(); cluster.start();
@ -117,15 +109,10 @@ public void testTimelineServiceStartInMiniCluster() throws Exception {
Assert.assertNotNull("Timeline Service should have been started", Assert.assertNotNull("Timeline Service should have been started",
cluster.getApplicationHistoryServer()); cluster.getApplicationHistoryServer());
} }
finally {
if(cluster != null) {
cluster.stop();
}
}
} }
@Test @Test
public void testMultiRMConf() { public void testMultiRMConf() throws IOException {
String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2"; String RM1_NODE_ID = "rm1", RM2_NODE_ID = "rm2";
int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000; int RM1_PORT_BASE = 10000, RM2_PORT_BASE = 20000;
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
@ -139,23 +126,28 @@ public void testMultiRMConf() {
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true); conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_USE_RPC, true);
MiniYARNCluster cluster = try (MiniYARNCluster cluster =
new MiniYARNCluster(TestMiniYarnCluster.class.getName(), new MiniYARNCluster(TestMiniYarnCluster.class.getName(),
2, 0, 1, 1); 2, 0, 1, 1)) {
cluster.init(conf); cluster.init(conf);
Configuration conf1 = cluster.getResourceManager(0).getConfig(), Configuration conf1 = cluster.getResourceManager(0).getConfig(),
conf2 = cluster.getResourceManager(1).getConfig(); conf2 = cluster.getResourceManager(1).getConfig();
Assert.assertFalse(conf1 == conf2); Assert.assertFalse(conf1 == conf2);
Assert.assertEquals("0.0.0.0:18032", Assert.assertEquals("0.0.0.0:18032",
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID))); conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
Assert.assertEquals("0.0.0.0:28032", RM1_NODE_ID)));
conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID))); Assert.assertEquals("0.0.0.0:28032",
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID)); conf1.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
RM2_NODE_ID)));
Assert.assertEquals("rm1", conf1.get(YarnConfiguration.RM_HA_ID));
Assert.assertEquals("0.0.0.0:18032", Assert.assertEquals("0.0.0.0:18032",
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM1_NODE_ID))); conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
Assert.assertEquals("0.0.0.0:28032", RM1_NODE_ID)));
conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS, RM2_NODE_ID))); Assert.assertEquals("0.0.0.0:28032",
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID)); conf2.get(HAUtil.addSuffix(YarnConfiguration.RM_ADDRESS,
RM2_NODE_ID)));
Assert.assertEquals("rm2", conf2.get(YarnConfiguration.RM_HA_ID));
}
} }
} }