YARN-11388: Prevent resource leaks in TestClientRMService. (#5187)

Signed-off-by: Shilun Fan <slfan1989@apache.org>
(cherry picked from commit 6b67373d10)
This commit is contained in:
Chris Nauroth 2022-12-28 11:00:27 -08:00
parent 290dc7817c
commit 1f270d8a5e
1 changed files with 158 additions and 188 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -59,9 +60,11 @@ import org.apache.commons.io.FileUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
@ -208,6 +211,11 @@ public class TestClientRMService {
private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo"; private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
private File resourceTypesFile = null; private File resourceTypesFile = null;
private Configuration conf;
private ResourceManager resourceManager;
private YarnRPC rpc;
private ApplicationClientProtocol client;
@Test @Test
public void testGetDecommissioningClusterNodes() throws Exception { public void testGetDecommissioningClusterNodes() throws Exception {
MockRM rm = new MockRM() { MockRM rm = new MockRM() {
@ -218,6 +226,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
int nodeMemory = 1024; int nodeMemory = 1024;
@ -230,13 +239,12 @@ public class TestClientRMService {
rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING); rm.waitForState(nm1.getNodeId(), NodeState.DECOMMISSIONING);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call // Make call
List<NodeReport> nodeReports = client.getClusterNodes( List<NodeReport> nodeReports = client.getClusterNodes(
@ -247,9 +255,6 @@ public class TestClientRMService {
NodeReport nr = nodeReports.iterator().next(); NodeReport nr = nodeReports.iterator().next();
Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout()); Assert.assertEquals(decommissioningTimeout, nr.getDecommissioningTimeout());
Assert.assertNull(nr.getNodeUpdateType()); Assert.assertNull(nr.getNodeUpdateType());
rpc.stopProxy(client, conf);
rm.close();
} }
@Test @Test
@ -261,6 +266,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager(); RMNodeLabelsManager labelsMgr = rm.getRMContext().getNodeLabelManager();
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y")); labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y"));
@ -281,13 +287,12 @@ public class TestClientRMService {
rm.sendNodeLost(lostNode); rm.sendNodeLost(lostNode);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call // Make call
GetClusterNodesRequest request = GetClusterNodesRequest request =
@ -346,9 +351,6 @@ public class TestClientRMService {
Assert.assertNull(report.getDecommissioningTimeout()); Assert.assertNull(report.getDecommissioningTimeout());
Assert.assertNull(report.getNodeUpdateType()); Assert.assertNull(report.getNodeUpdateType());
} }
rpc.stopProxy(client, conf);
rm.close();
} }
@Test @Test
@ -358,7 +360,6 @@ public class TestClientRMService {
new ConcurrentHashMap<ApplicationId, RMApp>()); new ConcurrentHashMap<ApplicationId, RMApp>());
ClientRMService rmService = new ClientRMService(rmContext, null, null, ClientRMService rmService = new ClientRMService(rmContext, null, null,
null, null, null); null, null, null);
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationReportRequest request = recordFactory GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class); .newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(ApplicationId.newInstance(0, 0)); request.setApplicationId(ApplicationId.newInstance(0, 0));
@ -389,7 +390,6 @@ public class TestClientRMService {
ClientRMService rmService = new ClientRMService(rmContext, scheduler, ClientRMService rmService = new ClientRMService(rmContext, scheduler,
null, mockAclsManager, null, null); null, mockAclsManager, null, null);
try { try {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationReportRequest request = recordFactory GetApplicationReportRequest request = recordFactory
.newRecordInstance(GetApplicationReportRequest.class); .newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId1); request.setApplicationId(appId1);
@ -448,7 +448,6 @@ public class TestClientRMService {
public void testGetApplicationAttemptReport() throws YarnException, public void testGetApplicationAttemptReport() throws YarnException,
IOException { IOException {
ClientRMService rmService = createRMService(); ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationAttemptReportRequest request = recordFactory GetApplicationAttemptReportRequest request = recordFactory
.newRecordInstance(GetApplicationAttemptReportRequest.class); .newRecordInstance(GetApplicationAttemptReportRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@ -490,7 +489,6 @@ public class TestClientRMService {
@Test @Test
public void testGetApplicationAttempts() throws YarnException, IOException { public void testGetApplicationAttempts() throws YarnException, IOException {
ClientRMService rmService = createRMService(); ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetApplicationAttemptsRequest request = recordFactory GetApplicationAttemptsRequest request = recordFactory
.newRecordInstance(GetApplicationAttemptsRequest.class); .newRecordInstance(GetApplicationAttemptsRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@ -512,7 +510,6 @@ public class TestClientRMService {
@Test @Test
public void testGetContainerReport() throws YarnException, IOException { public void testGetContainerReport() throws YarnException, IOException {
ClientRMService rmService = createRMService(); ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetContainerReportRequest request = recordFactory GetContainerReportRequest request = recordFactory
.newRecordInstance(GetContainerReportRequest.class); .newRecordInstance(GetContainerReportRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@ -533,7 +530,6 @@ public class TestClientRMService {
@Test @Test
public void testGetContainers() throws YarnException, IOException { public void testGetContainers() throws YarnException, IOException {
ClientRMService rmService = createRMService(); ClientRMService rmService = createRMService();
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
GetContainersRequest request = recordFactory GetContainersRequest request = recordFactory
.newRecordInstance(GetContainersRequest.class); .newRecordInstance(GetContainersRequest.class);
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
@ -598,12 +594,13 @@ public class TestClientRMService {
@Test @Test
public void testApplicationTagsValidation() throws IOException { public void testApplicationTagsValidation() throws IOException {
YarnConfiguration conf = new YarnConfiguration(); conf = new YarnConfiguration();
int maxtags = 3, appMaxTagLength = 5; int maxtags = 3, appMaxTagLength = 5;
conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags); conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAGS, maxtags);
conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH, conf.setInt(YarnConfiguration.RM_APPLICATION_MAX_TAG_LENGTH,
appMaxTagLength); appMaxTagLength);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
@ -625,7 +622,6 @@ public class TestClientRMService {
tags = Arrays.asList("tãg1", "tag2#"); tags = Arrays.asList("tãg1", "tag2#");
validateApplicationTag(rmService, tags, validateApplicationTag(rmService, tags,
"A tag can only have ASCII characters! Invalid tag - tãg1"); "A tag can only have ASCII characters! Invalid tag - tãg1");
rm.close();
} }
private void validateApplicationTag(ClientRMService rmService, private void validateApplicationTag(ClientRMService rmService,
@ -643,9 +639,10 @@ public class TestClientRMService {
@Test @Test
public void testForceKillApplication() throws Exception { public void testForceKillApplication() throws Exception {
YarnConfiguration conf = new YarnConfiguration(); conf = new YarnConfiguration();
conf.setBoolean(MockRM.ENABLE_WEBAPP, true); conf.setBoolean(MockRM.ENABLE_WEBAPP, true);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
@ -1687,8 +1684,7 @@ public class TestClientRMService {
RMContainerImpl containerimpl = spy(new RMContainerImpl(container, RMContainerImpl containerimpl = spy(new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container), attemptId, null, "", SchedulerRequestKey.extractFrom(container), attemptId, null, "",
rmContext)); rmContext));
Map<ApplicationAttemptId, RMAppAttempt> attempts = Map<ApplicationAttemptId, RMAppAttempt> attempts = new HashMap<>();
new HashMap<ApplicationAttemptId, RMAppAttempt>();
attempts.put(attemptId, rmAppAttemptImpl); attempts.put(attemptId, rmAppAttemptImpl);
when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl); when(app.getCurrentAppAttempt()).thenReturn(rmAppAttemptImpl);
when(app.getAppAttempts()).thenReturn(attempts); when(app.getAppAttempts()).thenReturn(attempts);
@ -1750,6 +1746,7 @@ public class TestClientRMService {
ResourceScheduler.class); ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.start(); rm.start();
try { try {
rm.registerNode("127.0.0.1:1", 102400, 100); rm.registerNode("127.0.0.1:1", 102400, 100);
@ -1790,8 +1787,8 @@ public class TestClientRMService {
@Test @Test
public void testCreateReservation() { public void testCreateReservation() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -1821,14 +1818,12 @@ public class TestClientRMService {
} catch (Exception e) { } catch (Exception e) {
Assert.assertTrue(e instanceof YarnException); Assert.assertTrue(e instanceof YarnException);
} }
rm.stop();
} }
@Test @Test
public void testUpdateReservation() { public void testUpdateReservation() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -1857,14 +1852,12 @@ public class TestClientRMService {
} }
Assert.assertNotNull(uResponse); Assert.assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse); System.out.println("Update reservation response: " + uResponse);
rm.stop();
} }
@Test @Test
public void testListReservationsByReservationId() { public void testListReservationsByReservationId() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -1888,14 +1881,12 @@ public class TestClientRMService {
.getReservationId().getId(), reservationID.getId()); .getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0) Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0); .getResourceAllocationRequests().size(), 0);
rm.stop();
} }
@Test @Test
public void testListReservationsByTimeInterval() { public void testListReservationsByTimeInterval() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -1947,14 +1938,12 @@ public class TestClientRMService {
reservationRequests.getInterpreter().toString()); reservationRequests.getInterpreter().toString());
Assert.assertTrue(reservationRequests.getReservationResources().get(0) Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration); .getDuration() == duration);
rm.stop();
} }
@Test @Test
public void testListReservationsByInvalidTimeInterval() { public void testListReservationsByInvalidTimeInterval() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -1991,14 +1980,12 @@ public class TestClientRMService {
Assert.assertEquals(1, response.getReservationAllocationState().size()); Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0) Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId()); .getReservationId().getId(), sRequest.getReservationId().getId());
rm.stop();
} }
@Test @Test
public void testListReservationsByTimeIntervalContainingNoReservations() { public void testListReservationsByTimeIntervalContainingNoReservations() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -2073,14 +2060,12 @@ public class TestClientRMService {
// Ensure all reservations are filtered out. // Ensure all reservations are filtered out.
Assert.assertNotNull(response); Assert.assertNotNull(response);
assertThat(response.getReservationAllocationState()).isEmpty(); assertThat(response.getReservationAllocationState()).isEmpty();
rm.stop();
} }
@Test @Test
public void testReservationDelete() { public void testReservationDelete() {
ResourceManager rm = setupResourceManager(); resourceManager = setupResourceManager();
ClientRMService clientService = rm.getClientRMService(); ClientRMService clientService = resourceManager.getClientRMService();
Clock clock = new UTCClock(); Clock clock = new UTCClock();
long arrival = clock.getTime(); long arrival = clock.getTime();
long duration = 60000; long duration = 60000;
@ -2114,8 +2099,6 @@ public class TestClientRMService {
} }
Assert.assertNotNull(response); Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size()); Assert.assertEquals(0, response.getReservationAllocationState().size());
rm.stop();
} }
@Test @Test
@ -2128,6 +2111,7 @@ public class TestClientRMService {
.getRMDelegationTokenSecretManager()); .getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false); NodeLabel labelX = NodeLabel.newInstance("x", false);
NodeLabel labelY = NodeLabel.newInstance("y"); NodeLabel labelY = NodeLabel.newInstance("y");
@ -2142,12 +2126,12 @@ public class TestClientRMService {
labelsMgr.replaceLabelsOnNode(map); labelsMgr.replaceLabelsOnNode(map);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc client = (ApplicationClientProtocol) rpc.getProxy(
.getProxy(ApplicationClientProtocol.class, rmAddress, conf); ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection // Get node labels collection
GetClusterNodeLabelsResponse response = client GetClusterNodeLabelsResponse response = client
@ -2168,9 +2152,6 @@ public class TestClientRMService {
// Below label "x" is not present in the response as exclusivity is true // Below label "x" is not present in the response as exclusivity is true
Assert.assertFalse(nodeToLabels.get(node1).containsAll( Assert.assertFalse(nodeToLabels.get(node1).containsAll(
Arrays.asList(NodeLabel.newInstance("x")))); Arrays.asList(NodeLabel.newInstance("x"))));
rpc.stopProxy(client, conf);
rm.stop();
} }
@Test @Test
@ -2183,6 +2164,7 @@ public class TestClientRMService {
.getRMDelegationTokenSecretManager()); .getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
NodeLabel labelX = NodeLabel.newInstance("x", false); NodeLabel labelX = NodeLabel.newInstance("x", false);
@ -2205,12 +2187,12 @@ public class TestClientRMService {
labelsMgr.replaceLabelsOnNode(map); labelsMgr.replaceLabelsOnNode(map);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc client = (ApplicationClientProtocol) rpc.getProxy(
.getProxy(ApplicationClientProtocol.class, rmAddress, conf); ApplicationClientProtocol.class, rmAddress, conf);
// Get node labels collection // Get node labels collection
GetClusterNodeLabelsResponse response = client GetClusterNodeLabelsResponse response = client
@ -2244,9 +2226,6 @@ public class TestClientRMService {
Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll( Assert.assertTrue(labelsToNodes.get(labelZ.getName()).containsAll(
Arrays.asList(node1B, node3B))); Arrays.asList(node1B, node3B)));
assertThat(labelsToNodes.get(labelY.getName())).isNull(); assertThat(labelsToNodes.get(labelY.getName())).isNull();
rpc.stopProxy(client, conf);
rm.close();
} }
@Test(timeout = 120000) @Test(timeout = 120000)
@ -2259,6 +2238,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
} }
}; };
resourceManager = rm;
rm.start(); rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@ -2278,12 +2258,12 @@ public class TestClientRMService {
nodes.put(host2.getHost(), ImmutableSet.of(docker)); nodes.put(host2.getHost(), ImmutableSet.of(docker));
mgr.addNodeAttributes(nodes); mgr.addNodeAttributes(nodes);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc client = (ApplicationClientProtocol) rpc.getProxy(
.getProxy(ApplicationClientProtocol.class, rmAddress, conf); ApplicationClientProtocol.class, rmAddress, conf);
GetClusterNodeAttributesRequest request = GetClusterNodeAttributesRequest request =
GetClusterNodeAttributesRequest.newInstance(); GetClusterNodeAttributesRequest.newInstance();
@ -2295,8 +2275,6 @@ public class TestClientRMService {
Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os))); Assert.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(os)));
Assert Assert
.assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker))); .assertTrue(attributes.contains(NodeAttributeInfo.newInstance(docker)));
rpc.stopProxy(client, conf);
rm.close();
} }
@Test(timeout = 120000) @Test(timeout = 120000)
@ -2309,6 +2287,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
} }
}; };
resourceManager = rm;
rm.start(); rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@ -2331,12 +2310,12 @@ public class TestClientRMService {
nodes.put(node2, ImmutableSet.of(docker, dist)); nodes.put(node2, ImmutableSet.of(docker, dist));
mgr.addNodeAttributes(nodes); mgr.addNodeAttributes(nodes);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc client = (ApplicationClientProtocol) rpc.getProxy(
.getProxy(ApplicationClientProtocol.class, rmAddress, conf); ApplicationClientProtocol.class, rmAddress, conf);
GetAttributesToNodesRequest request = GetAttributesToNodesRequest request =
GetAttributesToNodesRequest.newInstance(); GetAttributesToNodesRequest.newInstance();
@ -2377,8 +2356,6 @@ public class TestClientRMService {
attrs3.get(os.getAttributeKey()))); attrs3.get(os.getAttributeKey())));
Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0", Assert.assertTrue(findHostnameAndValInMapping(node2, "docker0",
attrs3.get(docker.getAttributeKey()))); attrs3.get(docker.getAttributeKey())));
rpc.stopProxy(client, conf);
rm.close();
} }
private boolean findHostnameAndValInMapping(String hostname, String attrVal, private boolean findHostnameAndValInMapping(String hostname, String attrVal,
@ -2401,6 +2378,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
} }
}; };
resourceManager = rm;
rm.start(); rm.start();
NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager(); NodeAttributesManager mgr = rm.getRMContext().getNodeAttributesManager();
@ -2423,12 +2401,12 @@ public class TestClientRMService {
nodes.put(node2, ImmutableSet.of(docker, dist)); nodes.put(node2, ImmutableSet.of(docker, dist));
mgr.addNodeAttributes(nodes); mgr.addNodeAttributes(nodes);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = (ApplicationClientProtocol) rpc client = (ApplicationClientProtocol) rpc.getProxy(
.getProxy(ApplicationClientProtocol.class, rmAddress, conf); ApplicationClientProtocol.class, rmAddress, conf);
// Specify null for hostnames. // Specify null for hostnames.
GetNodesToAttributesRequest request1 = GetNodesToAttributesRequest request1 =
@ -2471,8 +2449,6 @@ public class TestClientRMService {
client.getNodesToAttributes(request4); client.getNodesToAttributes(request4);
hostToAttrs = response4.getNodeToAttributes(); hostToAttrs = response4.getNodeToAttributes();
Assert.assertEquals(0, hostToAttrs.size()); Assert.assertEquals(0, hostToAttrs.size());
rpc.stopProxy(client, conf);
rm.close();
} }
@Test(timeout = 120000) @Test(timeout = 120000)
@ -2480,13 +2456,14 @@ public class TestClientRMService {
throws Exception { throws Exception {
int maxPriority = 10; int maxPriority = 10;
int appPriority = 5; int appPriority = 5;
YarnConfiguration conf = new YarnConfiguration(); conf = new YarnConfiguration();
Assume.assumeFalse("FairScheduler does not support Application Priorities", Assume.assumeFalse("FairScheduler does not support Application Priorities",
conf.get(YarnConfiguration.RM_SCHEDULER) conf.get(YarnConfiguration.RM_SCHEDULER)
.equals(FairScheduler.class.getName())); .equals(FairScheduler.class.getName()));
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
maxPriority); maxPriority);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder
@ -2498,20 +2475,20 @@ public class TestClientRMService {
testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority); testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority);
rm.killApp(app1.getApplicationId()); rm.killApp(app1.getApplicationId());
rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); rm.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm.stop();
} }
@Test(timeout = 120000) @Test(timeout = 120000)
public void testUpdateApplicationPriorityRequest() throws Exception { public void testUpdateApplicationPriorityRequest() throws Exception {
int maxPriority = 10; int maxPriority = 10;
int appPriority = 5; int appPriority = 5;
YarnConfiguration conf = new YarnConfiguration(); conf = new YarnConfiguration();
Assume.assumeFalse("FairScheduler does not support Application Priorities", Assume.assumeFalse("FairScheduler does not support Application Priorities",
conf.get(YarnConfiguration.RM_SCHEDULER) conf.get(YarnConfiguration.RM_SCHEDULER)
.equals(FairScheduler.class.getName())); .equals(FairScheduler.class.getName()));
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY,
maxPriority); maxPriority);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
resourceManager = rm;
rm.init(conf); rm.init(conf);
rm.start(); rm.start();
rm.registerNode("host1:1234", 1024); rm.registerNode("host1:1234", 1024);
@ -2555,8 +2532,6 @@ public class TestClientRMService {
Assert.assertEquals("Incorrect priority has been set to application", Assert.assertEquals("Incorrect priority has been set to application",
appPriority, rmService.updateApplicationPriority(updateRequest) appPriority, rmService.updateApplicationPriority(updateRequest)
.getApplicationPriority().getPriority()); .getApplicationPriority().getPriority());
rm.stop();
} }
private void testApplicationPriorityUpdation(ClientRMService rmService, private void testApplicationPriorityUpdation(ClientRMService rmService,
@ -2576,24 +2551,23 @@ public class TestClientRMService {
updateApplicationPriority.getApplicationPriority().getPriority()); updateApplicationPriority.getApplicationPriority().getPriority());
} }
private void createExcludeFile(String filename) throws IOException { private File createExcludeFile(File testDir) throws IOException {
File file = new File(filename); File excludeFile = new File(testDir, "excludeFile");
if (file.exists()) { try (FileOutputStream out = new FileOutputStream(excludeFile)) {
file.delete(); out.write("decommisssionedHost".getBytes(UTF_8));
} }
return excludeFile;
FileOutputStream out = new FileOutputStream(file);
out.write("decommisssionedHost".getBytes());
out.close();
} }
@Test @Test
public void testRMStartWithDecommissionedNode() throws Exception { public void testRMStartWithDecommissionedNode() throws Exception {
String excludeFile = "excludeFile"; File testDir = GenericTestUtils.getRandomizedTestDir();
createExcludeFile(excludeFile); assertTrue("Failed to create test directory: " + testDir.getAbsolutePath(), testDir.mkdirs());
YarnConfiguration conf = new YarnConfiguration(); try {
File excludeFile = createExcludeFile(testDir);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
excludeFile); excludeFile.getAbsolutePath());
MockRM rm = new MockRM(conf) { MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, return new ClientRMService(this.rmContext, scheduler,
@ -2601,30 +2575,29 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call // Make call
GetClusterNodesRequest request = GetClusterNodesRequest request =
GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class)); GetClusterNodesRequest.newInstance(EnumSet.allOf(NodeState.class));
List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports(); List<NodeReport> nodeReports = client.getClusterNodes(request).getNodeReports();
Assert.assertEquals(1, nodeReports.size()); assertEquals(1, nodeReports.size());
} finally {
rm.stop(); FileUtil.fullyDelete(testDir);
rpc.stopProxy(client, conf); }
new File(excludeFile).delete();
} }
@Test @Test
public void testGetResourceTypesInfoWhenResourceProfileDisabled() public void testGetResourceTypesInfoWhenResourceProfileDisabled()
throws Exception { throws Exception {
YarnConfiguration conf = new YarnConfiguration(); conf = new YarnConfiguration();
MockRM rm = new MockRM(conf) { MockRM rm = new MockRM(conf) {
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler, return new ClientRMService(this.rmContext, scheduler,
@ -2632,14 +2605,14 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
} }
}; };
resourceManager = rm;
rm.start(); rm.start();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call // Make call
GetAllResourceTypeInfoRequest request = GetAllResourceTypeInfoRequest request =
@ -2659,9 +2632,6 @@ public class TestClientRMService {
response.getResourceTypeInfo().get(1).getName()); response.getResourceTypeInfo().get(1).getName());
Assert.assertEquals(ResourceInformation.VCORES.getUnits(), Assert.assertEquals(ResourceInformation.VCORES.getUnits(),
response.getResourceTypeInfo().get(1).getDefaultUnit()); response.getResourceTypeInfo().get(1).getDefaultUnit());
rm.stop();
rpc.stopProxy(client, conf);
} }
@Test @Test
@ -2755,6 +2725,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
Resource resource = BuilderUtils.newResource(1024, 1); Resource resource = BuilderUtils.newResource(1024, 1);
@ -2769,13 +2740,12 @@ public class TestClientRMService {
node.nodeHeartbeat(true); node.nodeHeartbeat(true);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
// Make call // Make call
GetClusterNodesRequest request = GetClusterNodesRequest request =
@ -2807,9 +2777,6 @@ public class TestClientRMService {
getResourceInformation("memory-mb").getUnits()); getResourceInformation("memory-mb").getUnits());
Assert.assertEquals(976562, nodeReports.get(0).getCapability(). Assert.assertEquals(976562, nodeReports.get(0).getCapability().
getResourceInformation("memory-mb").getValue()); getResourceInformation("memory-mb").getValue());
rpc.stopProxy(client, conf);
rm.close();
} }
@Test @Test
@ -2821,6 +2788,7 @@ public class TestClientRMService {
this.getRMContext().getRMDelegationTokenSecretManager()); this.getRMContext().getRMDelegationTokenSecretManager());
}; };
}; };
resourceManager = rm;
rm.start(); rm.start();
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics(); ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
@ -2833,13 +2801,12 @@ public class TestClientRMService {
repeat(7, clusterMetrics::incrNumShutdownNMs); repeat(7, clusterMetrics::incrNumShutdownNMs);
// Create a client. // Create a client.
Configuration conf = new Configuration(); conf = new Configuration();
YarnRPC rpc = YarnRPC.create(conf); rpc = YarnRPC.create(conf);
InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress(); InetSocketAddress rmAddress = rm.getClientRMService().getBindAddress();
LOG.info("Connecting to ResourceManager at " + rmAddress); LOG.info("Connecting to ResourceManager at " + rmAddress);
ApplicationClientProtocol client = client = (ApplicationClientProtocol) rpc.getProxy(
(ApplicationClientProtocol) rpc ApplicationClientProtocol.class, rmAddress, conf);
.getProxy(ApplicationClientProtocol.class, rmAddress, conf);
YarnClusterMetrics ymetrics = client.getClusterMetrics( YarnClusterMetrics ymetrics = client.getClusterMetrics(
GetClusterMetricsRequest.newInstance()).getClusterMetrics(); GetClusterMetricsRequest.newInstance()).getClusterMetrics();
@ -2852,18 +2819,21 @@ public class TestClientRMService {
Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers()); Assert.assertEquals(5, ymetrics.getNumUnhealthyNodeManagers());
Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers()); Assert.assertEquals(6, ymetrics.getNumRebootedNodeManagers());
Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers()); Assert.assertEquals(7, ymetrics.getNumShutdownNodeManagers());
rpc.stopProxy(client, conf);
rm.close();
} }
@After @After
public void tearDown(){ public void tearDown() throws Exception {
if (resourceTypesFile != null && resourceTypesFile.exists()) { if (resourceTypesFile != null && resourceTypesFile.exists()) {
resourceTypesFile.delete(); resourceTypesFile.delete();
} }
ClusterMetrics.destroy(); ClusterMetrics.destroy();
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
if (conf != null && client != null && rpc != null) {
rpc.stopProxy(client, conf);
}
if (resourceManager != null) {
resourceManager.close();
}
} }
private static void repeat(int n, Runnable r) { private static void repeat(int n, Runnable r) {