From 970b0b0c02bb8fbe8ff227c78e2332df623d1aea Mon Sep 17 00:00:00 2001 From: Weiwei Yang Date: Thu, 13 Jun 2019 10:44:47 +0800 Subject: [PATCH] YARN-9578. Add limit/actions/summarize options for app activities REST API. Contributed by Tao Yang. --- .../activities/ActivitiesManager.java | 56 ++- .../scheduler/activities/AppAllocation.java | 11 +- .../resourcemanager/webapp/RMWSConsts.java | 13 +- .../webapp/RMWebServiceProtocol.java | 7 +- .../resourcemanager/webapp/RMWebServices.java | 73 +++- .../webapp/dao/AppAllocationInfo.java | 3 +- .../activities/TestActivitiesManager.java | 112 ++++- .../webapp/ActivitiesTestUtils.java | 15 + .../TestRMWebServicesSchedulerActivities.java | 407 ++++++++++++------ ...edulerActivitiesWithMultiNodesEnabled.java | 44 +- .../webapp/DefaultRequestInterceptorREST.java | 3 +- .../webapp/FederationInterceptorREST.java | 3 +- .../router/webapp/RouterWebServices.java | 11 +- .../webapp/BaseRouterWebServicesTest.java | 4 +- .../webapp/MockRESTRequestInterceptor.java | 3 +- .../PassThroughRESTRequestInterceptor.java | 6 +- 16 files changed, 570 insertions(+), 201 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java index 2c314727c9d..4149ac1565d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -121,7 +122,8 @@ private void setupConfForCleanup(Configuration conf) { public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, Set requestPriorities, Set allocationRequestIds, - RMWSConsts.ActivitiesGroupBy groupBy) { + RMWSConsts.ActivitiesGroupBy groupBy, int limit, boolean summarize, + double maxTimeInSeconds) { RMApp app = rmContext.getRMApps().get(applicationId); if (app != null && app.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) { @@ -140,6 +142,17 @@ public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, allocations = new ArrayList(curAllocations); } } + if (summarize && allocations != null) { + AppAllocation summaryAppAllocation = + getSummarizedAppAllocation(allocations, maxTimeInSeconds); + if (summaryAppAllocation != null) { + allocations = Lists.newArrayList(summaryAppAllocation); + } + } + if (allocations != null && limit > 0 && limit < allocations.size()) { + allocations = + allocations.subList(allocations.size() - limit, allocations.size()); + } return new AppActivitiesInfo(allocations, applicationId, groupBy); } else { return new AppActivitiesInfo( @@ -148,6 +161,47 @@ public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId, } } + /** + * Get summarized app allocation from multiple allocations as follows: + * 1. Collect latest allocation attempts on nodes to construct an allocation + * summary on nodes from multiple app allocations which are recorded a few + * seconds before the last allocation. + * 2. Copy other fields from the last allocation. + */ + private AppAllocation getSummarizedAppAllocation( + List allocations, double maxTimeInSeconds) { + if (allocations == null || allocations.isEmpty()) { + return null; + } + long startTime = allocations.get(allocations.size() - 1).getTime() + - (long) (maxTimeInSeconds * 1000); + Map nodeActivities = new HashMap<>(); + for (int i = allocations.size() - 1; i >= 0; i--) { + AppAllocation appAllocation = allocations.get(i); + if (startTime > appAllocation.getTime()) { + break; + } + List activityNodes = appAllocation.getAllocationAttempts(); + for (ActivityNode an : activityNodes) { + if (an.getNodeId() != null) { + nodeActivities.putIfAbsent( + an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_" + + an.getNodeId(), an); + } + } + } + AppAllocation lastAppAllocation = allocations.get(allocations.size() - 1); + AppAllocation summarizedAppAllocation = + new AppAllocation(lastAppAllocation.getPriority(), null, + lastAppAllocation.getQueueName()); + summarizedAppAllocation + .updateAppContainerStateAndTime(null, lastAppAllocation.getAppState(), + lastAppAllocation.getTime(), lastAppAllocation.getDiagnostic()); + summarizedAppAllocation + .setAllocationAttempts(new ArrayList<>(nodeActivities.values())); + return summarizedAppAllocation; + } + public ActivitiesInfo getActivitiesInfo(String nodeId, RMWSConsts.ActivitiesGroupBy groupBy) { List allocations; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java index 69d6ccf218b..e226b50fb77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java @@ -84,11 +84,8 @@ public ActivityState getAppState() { return appState; } - public String getPriority() { - if (priority == null) { - return null; - } - return priority.toString(); + public Priority getPriority() { + return priority; } public String getContainerId() { @@ -128,4 +125,8 @@ public AppAllocation filterAllocationAttempts(Set requestPriorities, .collect(Collectors.toList()); return appAllocation; } + + public void setAllocationAttempts(List allocationAttempts) { + this.allocationAttempts = allocationAttempts; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java index b7a60087e64..f2d2b822cd2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java @@ -71,7 +71,7 @@ public final class RMWSConsts { /** Path for {@code RMWebServiceProtocol#getAppActivities}. */ public static final String SCHEDULER_APP_ACTIVITIES = - "/scheduler/app-activities"; + "/scheduler/app-activities/{appid}"; /** Path for {@code RMWebServiceProtocol#getAppStatistics}. */ public static final String APP_STATISTICS = "/appstatistics"; @@ -237,6 +237,8 @@ public final class RMWSConsts { public static final String GROUP_BY = "groupBy"; public static final String SIGNAL = "signal"; public static final String COMMAND = "command"; + public static final String ACTIONS = "actions"; + public static final String SUMMARIZE = "summarize"; private RMWSConsts() { // not called @@ -250,4 +252,13 @@ private RMWSConsts() { public enum ActivitiesGroupBy { DIAGNOSTIC } + + /** + * Defines the required action of app activities: + * REFRESH means to turn on activities recording for the required app, + * GET means the required app activities should be involved in response. + */ + public enum AppActivitiesRequiredAction { + REFRESH, GET + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java index 3aa2593c1c2..a5bd93bbbed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java @@ -227,11 +227,16 @@ ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, * the activities. It is a QueryParam. * @param groupBy the groupBy type by which the activities should be * aggregated. It is a QueryParam. + * @param limit set a limit of the result. It is a QueryParam. + * @param actions the required actions of app activities. It is a QueryParam. + * @param summarize whether app activities in multiple scheduling processes + * need to be summarized. It is a QueryParam. * @return all the activities about a specific app for a specific time */ AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy); + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize); /** * This method retrieves all the statistics for a specific app, and it is diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 3f010350cb5..762569fa6b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -236,6 +236,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol { public static final String DEFAULT_START_TIME = "0"; public static final String DEFAULT_END_TIME = "-1"; public static final String DEFAULT_INCLUDE_RESOURCE = "false"; + public static final String DEFAULT_SUMMARIZE = "false"; @VisibleForTesting boolean isCentralizedNodeLabelConfiguration = true; @@ -717,12 +718,16 @@ public ActivitiesInfo getActivities(@Context HttpServletRequest hsr, MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 }) @Override public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, - @QueryParam(RMWSConsts.APP_ID) String appId, + @PathParam(RMWSConsts.APPID) String appId, @QueryParam(RMWSConsts.MAX_TIME) String time, @QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set requestPriorities, @QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS) Set allocationRequestIds, - @QueryParam(RMWSConsts.GROUP_BY) String groupBy) { + @QueryParam(RMWSConsts.GROUP_BY) String groupBy, + @QueryParam(RMWSConsts.LIMIT) String limit, + @QueryParam(RMWSConsts.ACTIONS) Set actions, + @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE) + boolean summarize) { initForReadableEndpoints(); YarnScheduler scheduler = rm.getRMContext().getScheduler(); @@ -749,6 +754,26 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, return new AppActivitiesInfo(e.getMessage(), appId); } + Set requiredActions; + try { + requiredActions = parseAppActivitiesRequiredActions(actions); + } catch (IllegalArgumentException e) { + return new AppActivitiesInfo(e.getMessage(), appId); + } + + int limitNum = -1; + if (limit != null) { + try { + limitNum = Integer.parseInt(limit); + if (limitNum <= 0) { + return new AppActivitiesInfo( + "limit must be greater than 0!", appId); + } + } catch (NumberFormatException e) { + return new AppActivitiesInfo("limit must be integer!", appId); + } + } + double maxTime = 3.0; if (time != null) { @@ -762,12 +787,21 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, ApplicationId applicationId; try { applicationId = ApplicationId.fromString(appId); - activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime); - AppActivitiesInfo appActivitiesInfo = - activitiesManager.getAppActivitiesInfo(applicationId, - requestPriorities, allocationRequestIds, activitiesGroupBy); - - return appActivitiesInfo; + if (requiredActions + .contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) { + activitiesManager + .turnOnAppActivitiesRecording(applicationId, maxTime); + } + if (requiredActions + .contains(RMWSConsts.AppActivitiesRequiredAction.GET)) { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(applicationId, requestPriorities, + allocationRequestIds, activitiesGroupBy, limitNum, + summarize, maxTime); + return appActivitiesInfo; + } + return new AppActivitiesInfo("Successfully notified actions: " + + StringUtils.join(',', actions), appId); } catch (Exception e) { String errMessage = "Cannot find application with given appId"; LOG.error(errMessage, e); @@ -778,6 +812,29 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, return null; } + private Set + parseAppActivitiesRequiredActions(Set actions) { + Set requiredActions = + new HashSet<>(); + if (actions == null || actions.isEmpty()) { + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.REFRESH); + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.GET); + } else { + for (String action : actions) { + if (!EnumUtils.isValidEnum(RMWSConsts.AppActivitiesRequiredAction.class, + action.toUpperCase())) { + String errMesasge = + "Got invalid action: " + action + ", valid actions: " + Arrays + .asList(RMWSConsts.AppActivitiesRequiredAction.values()); + throw new IllegalArgumentException(errMesasge); + } + requiredActions.add(RMWSConsts.AppActivitiesRequiredAction + .valueOf(action.toUpperCase())); + } + } + return requiredActions; + } + private RMWSConsts.ActivitiesGroupBy parseActivitiesGroupBy(String groupBy) { if (groupBy != null) { if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java index 6b0d86ba92b..6ae1f9a819b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java @@ -54,7 +54,8 @@ public class AppAllocationInfo { this.requestAllocation = new ArrayList<>(); this.nodeId = allocation.getNodeId(); this.queueName = allocation.getQueueName(); - this.appPriority = allocation.getPriority(); + this.appPriority = allocation.getPriority() == null ? + null : allocation.getPriority().toString(); this.timestamp = allocation.getTime(); this.dateTime = new Date(allocation.getTime()).toString(); this.allocationState = allocation.getAppState().name(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java index 495c7e248b0..2bf6b23ed70 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.SystemClock; @@ -286,18 +288,124 @@ public void testAppActivitiesTTL() throws Exception { ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); } AppActivitiesInfo appActivitiesInfo = newActivitiesManager - .getAppActivitiesInfo(app.getApplicationId(), null, null, null); + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 3); Assert.assertEquals(numActivities, appActivitiesInfo.getAllocations().size()); // sleep until all app activities expired Thread.sleep(cleanupIntervalMs + appActivitiesTTL); // there should be no remaining app activities appActivitiesInfo = newActivitiesManager - .getAppActivitiesInfo(app.getApplicationId(), null, null, null); + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 3); Assert.assertEquals(0, appActivitiesInfo.getAllocations().size()); } + @Test (timeout = 30000) + public void testAppActivitiesPerformance() { + // start recording activities for first app + SchedulerApplicationAttempt app = apps.get(0); + FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0); + activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100); + int numActivities = 100; + int numNodes = 10000; + int testingTimes = 10; + for (int ano = 0; ano < numActivities; ano++) { + ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node, + SystemClock.getInstance().getTime(), app); + for (int i = 0; i < numNodes; i++) { + NodeId nodeId = NodeId.newInstance("host" + i, 0); + activitiesManager + .addSchedulingActivityForApp(app.getApplicationId(), null, "0", + ActivityState.SKIPPED, + ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, "container", + nodeId, "0"); + } + ActivitiesLogger.APP + .finishAllocatedAppAllocationRecording(activitiesManager, + app.getApplicationId(), null, ActivityState.SKIPPED, + ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES); + } + + // It often take a longer time for the first query, ignore this distraction + activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + true, 100); + + // Test getting normal app activities + Supplier normalSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1, + false, 100); + Assert.assertEquals(numActivities, + appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + return null; + }; + testManyTimes("Getting normal app activities", normalSupplier, + testingTimes); + + // Test getting aggregated app activities + Supplier aggregatedSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, + RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100); + Assert.assertEquals(numActivities, + appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().get(0).getNodeIds().size()); + return null; + }; + testManyTimes("Getting aggregated app activities", aggregatedSupplier, + testingTimes); + + // Test getting summarized app activities + Supplier summarizedSupplier = () -> { + AppActivitiesInfo appActivitiesInfo = activitiesManager + .getAppActivitiesInfo(app.getApplicationId(), null, null, + RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100); + Assert.assertEquals(1, appActivitiesInfo.getAllocations().size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .size()); + Assert.assertEquals(1, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().size()); + Assert.assertEquals(numNodes, + appActivitiesInfo.getAllocations().get(0).getRequestAllocation() + .get(0).getAllocationAttempt().get(0).getNodeIds().size()); + return null; + }; + testManyTimes("Getting summarized app activities", summarizedSupplier, + testingTimes); + } + + private void testManyTimes(String testingName, + Supplier supplier, int testingTimes) { + long totalTime = 0; + for (int i = 0; i < testingTimes; i++) { + long startTime = System.currentTimeMillis(); + supplier.get(); + totalTime += System.currentTimeMillis() - startTime; + } + System.out.println("#" + testingName + ", testing times : " + testingTimes + + ", total cost time : " + totalTime + " ms, average cost time : " + + (float) totalTime / testingTimes + " ms."); + } + /** * Testing activities manager which can record all history information about * node allocations. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java index da898627f94..666e5fe9a5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java @@ -41,6 +41,8 @@ import java.util.HashSet; import java.util.List; import java.util.function.Predicate; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; @@ -209,4 +211,17 @@ public static JSONObject requestWebResource(WebResource webResource, response.getType().toString()); return response.getEntity(JSONObject.class); } + + /** + * Convert format using {name} (HTTP base) into %s (Java based). + * @param format Initial format using {}. + * @param args Arguments for the format. + * @return New format using %s. + */ + public static String format(String format, Object... args) { + Pattern p = Pattern.compile("\\{.*?}"); + Matcher m = p.matcher(format); + String newFormat = m.replaceAll("%s"); + return String.format(newFormat, args); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index 1e08f05e134..8bdecb769d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -437,25 +437,17 @@ public void testAppActivityJSON() throws Exception { RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1"); //Get JSON - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + ActivitiesTestUtils.requestWebResource(r, params); + nm.nodeHeartbeat(true); Thread.sleep(5000); //Get JSON - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); //Check app activities verifyNumberOfAllocations(json, 1); @@ -502,25 +494,17 @@ public void testAppAssignMultipleContainersPerNodeHeartbeat() 10)), null); //Get JSON - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + ActivitiesTestUtils.requestWebResource(r, params); + nm.nodeHeartbeat(true); Thread.sleep(5000); //Get JSON - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 10); @@ -555,26 +539,17 @@ public void testAppAssignWithoutAvailableResource() throws Exception { 10)), null); //Get JSON - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + ActivitiesTestUtils.requestWebResource(r, params); + nm.nodeHeartbeat(true); Thread.sleep(5000); //Get JSON - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 0); } finally { rm.stop(); @@ -590,24 +565,14 @@ public void testAppNoNM() throws Exception { RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1"); //Get JSON - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + ActivitiesTestUtils.requestWebResource(r, params); //Get JSON - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 0); } finally { rm.stop(); @@ -639,49 +604,23 @@ public void testAppReserveNewContainer() throws Exception { 10)), null); // Reserve new container - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + ActivitiesTestUtils.requestWebResource(r, params); nm2.nodeHeartbeat(true); Thread.sleep(1000); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 1); // Do a node heartbeat again without releasing container from app2 - r = resource(); - params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - nm2.nodeHeartbeat(true); Thread.sleep(1000); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 2); // Finish application 2 @@ -693,26 +632,10 @@ public void testAppReserveNewContainer() throws Exception { RMContainerEventType.FINISHED); // Do a node heartbeat again - r = resource(); - params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - nm2.nodeHeartbeat(true); Thread.sleep(1000); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 3); } finally { rm.stop(); @@ -847,15 +770,11 @@ public void testAppInsufficientResourceDiagnostic() RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); assertEquals("waiting for display", json.getString("diagnostic")); @@ -867,14 +786,7 @@ public void testAppInsufficientResourceDiagnostic() cs.handle(new NodeUpdateSchedulerEvent( rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); - response = - r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 1); JSONObject allocationObj = json.getJSONObject("allocations"); JSONObject requestAllocationObj = @@ -904,15 +816,11 @@ public void testAppPlacementConstraintDiagnostic() RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); assertEquals("waiting for display", json.getString("diagnostic")); @@ -930,14 +838,7 @@ public void testAppPlacementConstraintDiagnostic() cs.handle(new NodeUpdateSchedulerEvent( rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); - response = - r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 1); JSONObject allocationObj = json.getJSONObject("allocations"); JSONObject requestAllocationObj = @@ -967,9 +868,9 @@ public void testAppFilterByRequestPrioritiesAndAllocationRequestIds() MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) - .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES); + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add("appId", app1.getApplicationId().toString()); JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); assertEquals("waiting for display", json.getString("diagnostic")); @@ -1064,4 +965,228 @@ public void testAppFilterByRequestPrioritiesAndAllocationRequestIds() rm.stop(); } } + + @Test(timeout = 30000) + public void testAppLimit() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024); + try { + RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", + json.getString("diagnostic")); + + // am1 asks for 1 * 5GB container + am1.allocate("*", 5120, 1, new ArrayList<>()); + // trigger scheduling triple, there will be 3 app activities in cache + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // query all app activities without limit + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + + // query all app activities with limit > 3 + params.putSingle(RMWSConsts.LIMIT, "10"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + + // query app activities with limit = 2 + params.putSingle(RMWSConsts.LIMIT, "2"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 2); + + // query app activities with limit = 1 + params.putSingle(RMWSConsts.LIMIT, "1"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // query all app activities with invalid limit + params.putSingle(RMWSConsts.LIMIT, "STRING"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be integer!", json.getString("diagnostic")); + + // query all app activities with limit = 0 + params.putSingle(RMWSConsts.LIMIT, "0"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be greater than 0!", + json.getString("diagnostic")); + + // query all app activities with limit < 0 + params.putSingle(RMWSConsts.LIMIT, "-3"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("limit must be greater than 0!", + json.getString("diagnostic")); + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000) + public void testAppActions() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024); + try { + RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + // am1 asks for 10 * 512MB container + am1.allocate("*", 512, 10, new ArrayList<>()); + + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + params.add("maxTime", 1); //only last for 1 second + + // testing invalid action + params.add(RMWSConsts.ACTIONS, "get"); + params.add(RMWSConsts.ACTIONS, "invalid-action"); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); + assertTrue(json.getString("diagnostic").startsWith("Got invalid action")); + + /* + * testing get action + */ + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + // app activities won't be recorded + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + /* + * testing update action + */ + params.putSingle(RMWSConsts.ACTIONS, "refresh"); + json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("Successfully notified actions: refresh", + json.getString("diagnostic")); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // app activities should be recorded + params.putSingle(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + /* + * testing update and get actions + */ + params.remove(RMWSConsts.ACTIONS); + params.add(RMWSConsts.ACTIONS, "refresh"); + params.add(RMWSConsts.ACTIONS, "get"); + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 1); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // more app activities should be recorded + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 2); + + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + Thread.sleep(1000); + + // more app activities should be recorded + json = ActivitiesTestUtils.requestWebResource(r, params); + verifyNumberOfAllocations(json, 3); + } finally { + rm.stop(); + } + } + + @Test(timeout=30000) + public void testAppSummary() throws Exception { + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024); + MockNM nm2 = rm.registerNode("127.0.0.2:1234", 4 * 1024); + MockNM nm3 = rm.registerNode("127.0.0.3:1234", 4 * 1024); + + try { + RMApp app1 = rm.submitApp(5120, "app1", "user1", null, "b1"); + + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); + MultivaluedMapImpl params = new MultivaluedMapImpl(); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); + assertEquals("waiting for display", + json.getString("diagnostic")); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + // am1 asks for 1 * 5GB container + am1.allocate(Arrays.asList(ResourceRequest + .newInstance(Priority.newInstance(0), "*", + Resources.createResource(5 * 1024), 1)), null); + // trigger scheduling + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm2.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm3.getNodeId()))); + cs.handle(new NodeUpdateSchedulerEvent( + rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); + + params.add(RMWSConsts.SUMMARIZE, "true"); + params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC); + json = ActivitiesTestUtils.requestWebResource(r, params); + + // verify that response contains an allocation summary for all nodes + verifyNumberOfAllocations(json, 1); + JSONObject allocation = json.getJSONObject("allocations"); + JSONObject reqestAllocation = + allocation.getJSONObject("requestAllocation"); + JSONArray attempts = reqestAllocation.getJSONArray("allocationAttempt"); + assertEquals(2, attempts.length()); + for (int i = 0; i < attempts.length(); i++) { + JSONObject attempt = attempts.getJSONObject(i); + if (attempt.getString("allocationState").equals("SKIPPED")) { + JSONArray nodeIds = attempt.optJSONArray("nodeIds"); + assertEquals(2, nodeIds.length()); + } else if (attempt.getString("allocationState").equals("RESERVED")) { + assertEquals(nm1.getNodeId().toString(), + attempt.getString("nodeIds")); + } + } + } finally { + rm.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java index 8383a0d28c6..8998221238a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java @@ -249,15 +249,11 @@ public void testAppAssignContainer() throws Exception { 1)), null); //Trigger recording for this app - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString()); - ClientResponse response = r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); assertEquals("waiting for display", json.getString("diagnostic")); //Trigger scheduling for this app @@ -267,12 +263,7 @@ public void testAppAssignContainer() throws Exception { //Check app activities, it should contain one allocation and // final allocation state is ALLOCATED - response = r.path("ws").path("v1").path("cluster") - .path("scheduler/app-activities").queryParams(params) - .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 1); @@ -382,16 +373,11 @@ public void testAppInsufficientResourceDiagnostic() throws Exception { RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); - WebResource r = resource(); + WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString()); - - ClientResponse response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - JSONObject json = response.getEntity(JSONObject.class); + JSONObject json = ActivitiesTestUtils.requestWebResource(r, params); assertEquals("waiting for display", json.getString("diagnostic")); //Request two containers with different priority for am1 @@ -409,14 +395,8 @@ public void testAppInsufficientResourceDiagnostic() throws Exception { cs.handle(new NodeUpdateSchedulerEvent( rm.getRMContext().getRMNodes().get(nm1.getNodeId()))); - response = r.path("ws").path("v1").path("cluster").path( - "scheduler/app-activities").queryParams(params).accept( - MediaType.APPLICATION_JSON).get(ClientResponse.class); - assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8, - response.getType().toString()); - json = response.getEntity(JSONObject.class); - //Check app activities + json = ActivitiesTestUtils.requestWebResource(r, params); verifyNumberOfAllocations(json, 2); JSONArray allocationArray = json.getJSONArray("allocations"); //Check first activity is for second allocation with RESERVED state @@ -539,9 +519,9 @@ public void testAppGroupByDiagnostics() throws Exception { MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH) - .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES); + .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES, + app1.getApplicationId().toString())); MultivaluedMapImpl params = new MultivaluedMapImpl(); - params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString()); /* * test non-exist groupBy diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java index 7e6f3062521..bf0dee6c872 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java @@ -192,7 +192,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { // time and appId are specified inside hsr return RouterWebServiceUtil.genericForward(webAppAddress, hsr, AppActivitiesInfo.class, HTTPMethods.GET, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java index 1c8b7a85f29..1ed5f5929d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java @@ -1146,7 +1146,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { throw new NotImplementedException("Code is not implemented"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java index 9327c6f688d..93275476555 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java @@ -95,6 +95,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; +import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE; + /** * RouterWebServices is a service that runs on each router that can be used to * intercept and inspect {@link RMWebServiceProtocol} messages from client to @@ -465,11 +467,16 @@ public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr, @QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set requestPriorities, @QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS) Set allocationRequestIds, - @QueryParam(RMWSConsts.GROUP_BY) String groupBy) { + @QueryParam(RMWSConsts.GROUP_BY) String groupBy, + @QueryParam(RMWSConsts.LIMIT) String limit, + @QueryParam(RMWSConsts.ACTIONS) Set actions, + @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE) + boolean summarize) { init(); RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr); return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time, - requestPriorities, allocationRequestIds, groupBy); + requestPriorities, allocationRequestIds, groupBy, limit, actions, + summarize); } @GET diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java index 535c579a85d..78aab5a961b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java @@ -180,8 +180,8 @@ protected ActivitiesInfo getActivities(String user) protected AppActivitiesInfo getAppActivities(String user) throws IOException, InterruptedException { - return routerWebService.getAppActivities( - createHttpServletRequest(user), null, null, null, null, null); + return routerWebService.getAppActivities(createHttpServletRequest(user), + null, null, null, null, null, null, null, false); } protected ApplicationStatisticsInfo getAppStatistics(String user) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java index f93b397e386..50200ed2b71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java @@ -141,7 +141,8 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { return new AppActivitiesInfo(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java index 126610cc475..eb7222f9f3d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java @@ -169,9 +169,11 @@ public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId, @Override public AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId, String time, Set requestPriorities, - Set allocationRequestIds, String groupBy) { + Set allocationRequestIds, String groupBy, String limit, + Set actions, boolean summarize) { return getNextInterceptor().getAppActivities(hsr, appId, time, - requestPriorities, allocationRequestIds, groupBy); + requestPriorities, allocationRequestIds, groupBy, limit, + actions, summarize); } @Override