YARN-9489. Support filtering by request-priorities and allocation-request-ids for query results of app activities. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-05-09 21:54:09 +08:00
parent 25951255ce
commit 90add05caa
13 changed files with 196 additions and 13 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
@ -43,6 +44,7 @@ import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.ArrayList;
import java.util.stream.Collectors;
/**
* A class to store node or application allocations.
@ -89,7 +91,8 @@ public class ActivitiesManager extends AbstractService {
this.rmContext = rmContext;
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
Set<String> requestPriorities, Set<String> allocationRequestIds) {
RMApp app = rmContext.getRMApps().get(applicationId);
if (app != null && app.getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
@ -97,7 +100,16 @@ public class ActivitiesManager extends AbstractService {
completedAppAllocations.get(applicationId);
List<AppAllocation> allocations = null;
if (curAllocations != null) {
allocations = new ArrayList(curAllocations);
if (CollectionUtils.isNotEmpty(requestPriorities) || CollectionUtils
.isNotEmpty(allocationRequestIds)) {
allocations = curAllocations.stream().map(e -> e
.filterAllocationAttempts(requestPriorities,
allocationRequestIds))
.filter(e -> !e.getAllocationAttempts().isEmpty())
.collect(Collectors.toList());
} else {
allocations = new ArrayList(curAllocations);
}
}
return new AppActivitiesInfo(allocations, applicationId);
} else {

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/*
* It contains allocation information for one application within a period of
@ -105,4 +109,23 @@ public class AppAllocation {
public List<ActivityNode> getAllocationAttempts() {
return allocationAttempts;
}
public AppAllocation filterAllocationAttempts(Set<String> requestPriorities,
Set<String> allocationRequestIds) {
AppAllocation appAllocation =
new AppAllocation(this.priority, this.nodeId, this.queueName);
appAllocation.appState = this.appState;
appAllocation.containerId = this.containerId;
appAllocation.timestamp = this.timestamp;
appAllocation.diagnostic = this.diagnostic;
Predicate<ActivityNode> predicate = (e) ->
(CollectionUtils.isEmpty(requestPriorities) || requestPriorities
.contains(e.getRequestPriority())) && (
CollectionUtils.isEmpty(allocationRequestIds)
|| allocationRequestIds.contains(e.getAllocationRequestId()));
appAllocation.allocationAttempts =
this.allocationAttempts.stream().filter(predicate)
.collect(Collectors.toList());
return appAllocation;
}
}

View File

@ -226,6 +226,8 @@ public final class RMWSConsts {
public static final String DESELECTS = "deSelects";
public static final String CONTAINERS = "containers";
public static final String QUEUE_ACL_TYPE = "queue-acl-type";
public static final String REQUEST_PRIORITIES = "requestPriorities";
public static final String ALLOCATION_REQUEST_IDS = "allocationRequestIds";
private RMWSConsts() {
// not called

View File

@ -218,10 +218,15 @@ public interface RMWebServiceProtocol {
* QueryParam.
* @param time for how long we want to retrieve the activities. It is a
* QueryParam.
* @param requestPriorities the request priorities we want to retrieve the
* activities. It is a QueryParam.
* @param allocationRequestIds the allocation request ids we want to retrieve
* the activities. It is a QueryParam.
* @return all the activities about a specific app for a specific time
*/
AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId,
String time);
String time, Set<String> requestPriorities,
Set<String> allocationRequestIds);
/**
* This method retrieves all the statistics for a specific app, and it is

View File

@ -706,7 +706,10 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time) {
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
@ -741,7 +744,8 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
applicationId = ApplicationId.fromString(appId);
activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
AppActivitiesInfo appActivitiesInfo =
activitiesManager.getAppActivitiesInfo(applicationId);
activitiesManager.getAppActivitiesInfo(applicationId,
requestPriorities, allocationRequestIds);
return appActivitiesInfo;
} catch (Exception e) {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
@ -31,6 +34,8 @@ import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
@ -192,4 +197,16 @@ public final class ActivitiesTestUtils {
}
}
}
public static JSONObject requestWebResource(WebResource webResource,
MultivaluedMap<String, String> params) {
if (params != null) {
webResource = webResource.queryParams(params);
}
ClientResponse response = webResource.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
return response.getEntity(JSONObject.class);
}
}

View File

@ -953,4 +953,115 @@ public class TestRMWebServicesSchedulerActivities
rm.stop();
}
}
@Test (timeout=30000)
public void testAppFilterByRequestPrioritiesAndAllocationRequestIds()
throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
try {
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getString("diagnostic"));
// am1 asks for 1 * 1GB container with requestPriority=-1
// and allocationRequestId=1
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.allocationRequestId(1).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=-1
// and allocationRequestId=2
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.UNDEFINED)
.allocationRequestId(2).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=0
// and allocationRequestId=1
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.newInstance(0))
.allocationRequestId(1).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// am1 asks for 1 * 1GB container with requestPriority=0
// and allocationRequestId=3
am1.allocate(Arrays.asList(
ResourceRequest.newBuilder().priority(Priority.newInstance(0))
.allocationRequestId(3).resourceName("*")
.capability(Resources.createResource(1 * 1024)).numContainers(1)
.build()), null);
// trigger scheduling
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
// query app activities with requestPriorities={0,1}
MultivaluedMapImpl filterParams1 = new MultivaluedMapImpl(params);
filterParams1.add(RMWSConsts.REQUEST_PRIORITIES, "-1");
filterParams1.add(RMWSConsts.REQUEST_PRIORITIES, "0");
json = ActivitiesTestUtils.requestWebResource(r, filterParams1);
verifyNumberOfAllocations(json, 4);
// query app activities with requestPriorities=0
MultivaluedMapImpl filterParams2 = new MultivaluedMapImpl(params);
filterParams2.add(RMWSConsts.REQUEST_PRIORITIES, "-1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams2);
verifyNumberOfAllocations(json, 2);
JSONArray allocations = json.getJSONArray("allocations");
for (int i=0; i<allocations.length(); i++) {
assertEquals("-1",
allocations.getJSONObject(i).getJSONObject("requestAllocation")
.optString("requestPriority"));
}
// query app activities with allocationRequestId=1
MultivaluedMapImpl filterParams3 = new MultivaluedMapImpl(params);
filterParams3.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams3);
verifyNumberOfAllocations(json, 2);
allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
assertEquals("1",
allocations.getJSONObject(i).getJSONObject("requestAllocation")
.optString("allocationRequestId"));
}
// query app activities with requestPriorities=0 and allocationRequestId=1
MultivaluedMapImpl filterParams4 = new MultivaluedMapImpl(params);
filterParams4.add(RMWSConsts.REQUEST_PRIORITIES, "0");
filterParams4.add(RMWSConsts.ALLOCATION_REQUEST_IDS, "1");
json = ActivitiesTestUtils.requestWebResource(r, filterParams4);
verifyNumberOfAllocations(json, 1);
JSONObject allocation = json.getJSONObject("allocations");
assertEquals("0", allocation.getJSONObject("requestAllocation")
.optString("requestPriority"));
assertEquals("1", allocation.getJSONObject("requestAllocation")
.optString("allocationRequestId"));
} finally {
rm.stop();
}
}
}

View File

@ -190,7 +190,8 @@ public class DefaultRequestInterceptorREST
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
// time and appId are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppActivitiesInfo.class, HTTPMethods.GET,

View File

@ -1144,7 +1144,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
throw new NotImplementedException("Code is not implemented");
}

View File

@ -459,10 +459,14 @@ public class RouterWebServices implements RMWebServiceProtocol {
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.APP_ID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time) {
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time);
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time,
requestPriorities, allocationRequestIds);
}
@GET

View File

@ -181,7 +181,7 @@ public abstract class BaseRouterWebServicesTest {
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
return routerWebService.getAppActivities(
createHttpServletRequest(user), null, null);
createHttpServletRequest(user), null, null, null, null);
}
protected ApplicationStatisticsInfo getAppStatistics(String user)

View File

@ -139,7 +139,8 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
return new AppActivitiesInfo();
}

View File

@ -167,8 +167,10 @@ public class PassThroughRESTRequestInterceptor
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time) {
return getNextInterceptor().getAppActivities(hsr, appId, time);
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
return getNextInterceptor().getAppActivities(hsr, appId, time,
requestPriorities, allocationRequestIds);
}
@Override