YARN-11114. RMWebServices returns only apps matching exactly the submitted queue name. Contributed by Szilard Nemeth

This commit is contained in:
Szilard Nemeth 2022-04-20 19:39:47 +02:00 committed by Benjamin Teke
parent f143e99428
commit 9af3eabdca
3 changed files with 178 additions and 5 deletions

View File

@ -37,7 +37,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.cli.UnrecognizedOptionException;
import org.apache.commons.lang3.Range;
import org.slf4j.Logger;
@ -913,7 +912,17 @@ public class ClientRMService extends AbstractService implements
}
if (queues != null && !queues.isEmpty()) {
if (!queues.contains(application.getQueue())) {
Map<String, List<RMApp>> foundApps = queryApplicationsByQueues(apps, queues);
List<RMApp> runningAppsByQueues = foundApps.entrySet().stream()
.filter(e -> queues.contains(e.getKey()))
.map(Map.Entry::getValue)
.flatMap(Collection::stream)
.collect(Collectors.toList());
List<RMApp> runningAppsById = runningAppsByQueues.stream()
.filter(app -> app.getApplicationId().equals(application.getApplicationId()))
.collect(Collectors.toList());
if (runningAppsById.isEmpty() && !queues.contains(application.getQueue())) {
continue;
}
}
@ -992,6 +1001,22 @@ public class ClientRMService extends AbstractService implements
return response;
}
private Map<String, List<RMApp>> queryApplicationsByQueues(
Map<ApplicationId, RMApp> apps, Set<String> queues) {
final Map<String, List<RMApp>> appsToQueues = new HashMap<>();
for (String queue : queues) {
List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
if (appsInQueue != null && !appsInQueue.isEmpty()) {
for (ApplicationAttemptId appAttemptId : appsInQueue) {
RMApp rmApp = apps.get(appAttemptId.getApplicationId());
appsToQueues.putIfAbsent(queue, new ArrayList<>());
appsToQueues.get(queue).add(rmApp);
}
}
}
return appsToQueues;
}
private Set<String> getLowerCasedAppTypes(GetApplicationsRequest request) {
Set<String> applicationTypes = new HashSet<>();
if (request.getApplicationTypes() != null && !request.getApplicationTypes()

View File

@ -1402,9 +1402,9 @@ public class TestClientRMService {
request.setQueues(queueSet);
queueSet.add(queues[0]);
assertEquals("Incorrect number of applications in queue", 2,
assertEquals("Incorrect number of applications in queue", 3,
rmService.getApplications(request).getApplicationList().size());
assertEquals("Incorrect number of applications in queue", 2,
assertEquals("Incorrect number of applications in queue", 3,
rmService.getApplications(request).getApplicationList().size());
queueSet.add(queues[1]);

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@ -82,6 +83,16 @@ public class TestRMWebServicesApps extends JerseyTestBase {
private static final int CONTAINER_MB = 1024;
private static class WebServletModule extends ServletModule {
private final Class<? extends AbstractYarnScheduler> scheduler;
public WebServletModule() {
this.scheduler = FifoScheduler.class;
}
public WebServletModule(Class<? extends AbstractYarnScheduler> scheduler) {
this.scheduler = scheduler;
}
@Override
protected void configureServlets() {
bind(JAXBContextResolver.class);
@ -90,7 +101,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
conf.setClass(YarnConfiguration.RM_SCHEDULER, scheduler,
ResourceScheduler.class);
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
@ -1970,5 +1981,142 @@ public class TestRMWebServicesApps extends JerseyTestBase {
enforceExecutionType);
}
@Test
public void testAppsQueryByQueueShortname() throws Exception {
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule(CapacityScheduler.class)));
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
//YARN-11114 - Finished apps can only be queried with exactly the
// same queue name that the app is submitted to.
//As the queue is 'root.default' and the query is 'default' here,
// this app won't be returned.
RMApp finishedApp1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("root.default")
.build());
RMApp finishedApp2 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("default")
.build());
RMApp runningApp1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("default")
.build());
RMApp runningApp2 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("root.default")
.build());
amNodeManager.nodeHeartbeat(true);
finishApp(amNodeManager, finishedApp1);
amNodeManager.nodeHeartbeat(true);
finishApp(amNodeManager, finishedApp2);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps")
.queryParam("queue", "default")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject apps = json.getJSONObject("apps");
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
Set<String> appIds = getApplicationIds(array);
assertTrue("Running app 1 should be in the result list!",
appIds.contains(runningApp1.getApplicationId().toString()));
assertTrue("Running app 2 should be in the result list!",
appIds.contains(runningApp2.getApplicationId().toString()));
assertFalse("Finished app 1 should not be in the result list " +
"as it was submitted to 'root.default' but the query is for 'default'",
appIds.contains(finishedApp1.getApplicationId().toString()));
assertTrue("Finished app 2 should be in the result list " +
"as it was submitted to 'default' and the query is exactly for 'default'",
appIds.contains(finishedApp2.getApplicationId().toString()));
assertEquals("incorrect number of elements", 3, array.length());
rm.stop();
}
@Test
public void testAppsQueryByQueueFullname() throws Exception {
GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule(CapacityScheduler.class)));
rm.start();
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
RMApp finishedApp1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("root.default")
.build());
//YARN-11114 - Finished apps can only be queried with exactly the
// same queue name that the app is submitted to.
//As the queue is 'default' and the query is 'root.default' here,
// this app won't be returned,
RMApp finishedApp2 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("default")
.build());
RMApp runningApp1 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("default")
.build());
RMApp runningApp2 = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder
.createWithMemory(CONTAINER_MB, rm)
.withQueue("root.default")
.build());
amNodeManager.nodeHeartbeat(true);
finishApp(amNodeManager, finishedApp1);
amNodeManager.nodeHeartbeat(true);
finishApp(amNodeManager, finishedApp2);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path("apps")
.queryParam("queue", "root.default")
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
JSONObject apps = json.getJSONObject("apps");
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
Set<String> appIds = getApplicationIds(array);
assertTrue("Running app 1 should be in the result list!",
appIds.contains(runningApp1.getApplicationId().toString()));
assertTrue("Running app 2 should be in the result list!",
appIds.contains(runningApp2.getApplicationId().toString()));
assertTrue("Finished app 1 should be in the result list, " +
"as it was submitted to 'root.default' and the query is exactly for 'root.default'!",
appIds.contains(finishedApp1.getApplicationId().toString()));
assertFalse("Finished app 2 should not be in the result list, " +
"as it was submitted to 'default' but the query is for 'root.default'!",
appIds.contains(finishedApp2.getApplicationId().toString()));
assertEquals("incorrect number of elements", 3, array.length());
rm.stop();
}
}