YARN-9497. Support grouping by diagnostics for query results of scheduler and app activities. Contributed by Tao Yang.

This commit is contained in:
Weiwei Yang 2019-05-26 09:56:36 -04:00
parent 37900c5639
commit 9f056d905f
20 changed files with 403 additions and 55 deletions

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -58,7 +59,7 @@ public class ActivitiesManager extends AbstractService {
// An empty node ID, we use this variable as a placeholder
// in the activity records when recording multiple nodes assignments.
public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0);
public static final String DIAGNOSTICS_DETAILS_SEPARATOR = "\n";
public static final char DIAGNOSTICS_DETAILS_SEPARATOR = '\n';
public static final String EMPTY_DIAGNOSTICS = "";
private ThreadLocal<Map<NodeId, List<NodeAllocation>>>
recordingNodesAllocation;
@ -119,7 +120,8 @@ public class ActivitiesManager extends AbstractService {
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
Set<String> requestPriorities, Set<String> allocationRequestIds) {
Set<String> requestPriorities, Set<String> allocationRequestIds,
RMWSConsts.ActivitiesGroupBy groupBy) {
RMApp app = rmContext.getRMApps().get(applicationId);
if (app != null && app.getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
@ -138,7 +140,7 @@ public class ActivitiesManager extends AbstractService {
allocations = new ArrayList(curAllocations);
}
}
return new AppActivitiesInfo(allocations, applicationId);
return new AppActivitiesInfo(allocations, applicationId, groupBy);
} else {
return new AppActivitiesInfo(
"fail to get application activities after finished",
@ -146,14 +148,15 @@ public class ActivitiesManager extends AbstractService {
}
}
public ActivitiesInfo getActivitiesInfo(String nodeId) {
public ActivitiesInfo getActivitiesInfo(String nodeId,
RMWSConsts.ActivitiesGroupBy groupBy) {
List<NodeAllocation> allocations;
if (nodeId == null) {
allocations = lastAvailableNodeActivities;
} else {
allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
}
return new ActivitiesInfo(allocations, nodeId);
return new ActivitiesInfo(allocations, nodeId, groupBy);
}
public void recordNextNodeUpdateActivities(String nodeId) {

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Utilities for activities.
*/
public final class ActivitiesUtils {
private ActivitiesUtils(){}
public static List<ActivityNodeInfo> getRequestActivityNodeInfos(
List<ActivityNode> activityNodes,
RMWSConsts.ActivitiesGroupBy groupBy) {
if (activityNodes == null) {
return null;
}
if (groupBy == RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC) {
Map<ActivityState, Map<String, List<String>>> groupingResults =
activityNodes.stream().collect(Collectors
.groupingBy(ActivityNode::getState, Collectors
.groupingBy(ActivityNode::getShortDiagnostic,
Collectors.mapping(e -> e.getNodeId() == null ?
"" :
e.getNodeId().toString(), Collectors.toList()))));
return groupingResults.entrySet().stream().flatMap(
stateMap -> stateMap.getValue().entrySet().stream().map(
diagMap -> new ActivityNodeInfo(stateMap.getKey(),
diagMap.getKey().isEmpty() ? null : diagMap.getKey(),
diagMap.getValue())))
.collect(Collectors.toList());
} else {
return activityNodes.stream().map(
e -> new ActivityNodeInfo(e.getName(), e.getState(),
e.getDiagnostic(), e.getNodeId())).collect(Collectors.toList());
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import java.util.LinkedList;
@ -108,7 +109,7 @@ public class ActivityNode {
return allocationRequestId;
}
public boolean getType() {
public boolean isAppType() {
if (appPriority != null) {
return true;
} else {
@ -116,6 +117,19 @@ public class ActivityNode {
}
}
public boolean isRequestType() {
return requestPriority != null && nodeId == null;
}
public String getShortDiagnostic() {
if (this.diagnostic == null) {
return "";
} else {
return StringUtils.split(this.diagnostic,
ActivitiesManager.DIAGNOSTICS_DETAILS_SEPARATOR)[0];
}
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.activityNodeName + " ")

View File

@ -228,9 +228,18 @@ public final class RMWSConsts {
public static final String QUEUE_ACL_TYPE = "queue-acl-type";
public static final String REQUEST_PRIORITIES = "requestPriorities";
public static final String ALLOCATION_REQUEST_IDS = "allocationRequestIds";
public static final String GROUP_BY = "groupBy";
private RMWSConsts() {
// not called
}
/**
* Defines the groupBy types of activities, currently only support
* DIAGNOSTIC with which user can query aggregated activities
* grouped by allocation state and diagnostic.
*/
public enum ActivitiesGroupBy {
DIAGNOSTIC
}
}

View File

@ -204,9 +204,12 @@ public interface RMWebServiceProtocol {
* @param hsr the servlet request
* @param nodeId the node we want to retrieve the activities. It is a
* QueryParam.
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
* @return all the activities in the specific node
*/
ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId);
ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy);
/**
* This method retrieves all the activities for a specific app for a specific
@ -222,11 +225,13 @@ public interface RMWebServiceProtocol {
* activities. It is a QueryParam.
* @param allocationRequestIds the allocation request ids we want to retrieve
* the activities. It is a QueryParam.
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
* @return all the activities about a specific app for a specific time
*/
AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId,
String time, Set<String> requestPriorities,
Set<String> allocationRequestIds);
Set<String> allocationRequestIds, String groupBy);
/**
* This method retrieves all the statistics for a specific app, and it is

View File

@ -56,6 +56,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.EnumUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.http.JettyUtils;
@ -632,7 +633,8 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId) {
@QueryParam(RMWSConsts.NODEID) String nodeId,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
@ -649,6 +651,13 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return new ActivitiesInfo(errMessage, nodeId);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new ActivitiesInfo(e.getMessage(), nodeId);
}
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
@ -689,7 +698,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
if (!illegalInput) {
activitiesManager.recordNextNodeUpdateActivities(nodeId);
return activitiesManager.getActivitiesInfo(nodeId);
return activitiesManager.getActivitiesInfo(nodeId, activitiesGroupBy);
}
// Return a activities info with error message
@ -709,7 +718,8 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
@ -729,6 +739,13 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return new AppActivitiesInfo(errMessage, null);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new AppActivitiesInfo(e.getMessage(), appId);
}
double maxTime = 3.0;
if (time != null) {
@ -745,7 +762,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
AppActivitiesInfo appActivitiesInfo =
activitiesManager.getAppActivitiesInfo(applicationId,
requestPriorities, allocationRequestIds);
requestPriorities, allocationRequestIds, activitiesGroupBy);
return appActivitiesInfo;
} catch (Exception e) {
@ -758,6 +775,20 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return null;
}
private RMWSConsts.ActivitiesGroupBy parseActivitiesGroupBy(String groupBy) {
if (groupBy != null) {
if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class,
groupBy.toUpperCase())) {
String errMesasge =
"Got invalid groupBy: " + groupBy + ", valid groupBy types: "
+ Arrays.asList(RMWSConsts.ActivitiesGroupBy.values());
throw new IllegalArgumentException(errMesasge);
}
return RMWSConsts.ActivitiesGroupBy.valueOf(groupBy.toUpperCase());
}
return null;
}
@GET
@Path(RMWSConsts.APP_STATISTICS)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.base.Strings;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -53,7 +54,8 @@ public class ActivitiesInfo {
this.nodeId = nodeId;
}
public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) {
public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId,
RMWSConsts.ActivitiesGroupBy groupBy) {
this.nodeId = nodeId;
this.allocations = new ArrayList<>();
@ -78,7 +80,7 @@ public class ActivitiesInfo {
for (int i = 0; i < nodeAllocations.size(); i++) {
NodeAllocation nodeAllocation = nodeAllocations.get(i);
NodeAllocationInfo allocationInfo = new NodeAllocationInfo(
nodeAllocation);
nodeAllocation, groupBy);
this.allocations.add(allocationInfo);
}
}

View File

@ -20,14 +20,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.base.Strings;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/*
* DAO object to display node information in allocation tree.
@ -44,6 +46,10 @@ public class ActivityNodeInfo {
private String nodeId;
private String allocationRequestId;
// Used for groups of activities
private String count;
private List<String> nodeIds;
protected List<ActivityNodeInfo> children;
ActivityNodeInfo() {
@ -57,7 +63,16 @@ public class ActivityNodeInfo {
setNodeId(nId);
}
ActivityNodeInfo(ActivityNode node) {
public ActivityNodeInfo(ActivityState groupAllocationState,
String groupDiagnostic, List<String> groupNodeIds) {
this.allocationState = groupAllocationState.name();
this.diagnostic = groupDiagnostic;
this.count = String.valueOf(groupNodeIds.size());
this.nodeIds = groupNodeIds;
}
ActivityNodeInfo(ActivityNode node,
RMWSConsts.ActivitiesGroupBy groupBy) {
this.name = node.getName();
setPriority(node);
setNodeId(node.getNodeId());
@ -65,11 +80,14 @@ public class ActivityNodeInfo {
this.diagnostic = node.getDiagnostic();
this.requestPriority = node.getRequestPriority();
this.allocationRequestId = node.getAllocationRequestId();
this.children = new ArrayList<>();
for (ActivityNode child : node.getChildren()) {
ActivityNodeInfo containerInfo = new ActivityNodeInfo(child);
this.children.add(containerInfo);
// only consider grouping for request type
if (node.isRequestType()) {
this.children = ActivitiesUtils
.getRequestActivityNodeInfos(node.getChildren(), groupBy);
} else {
this.children = node.getChildren().stream()
.map(e -> new ActivityNodeInfo(e, groupBy))
.collect(Collectors.toList());
}
}
@ -80,7 +98,7 @@ public class ActivityNodeInfo {
}
private void setPriority(ActivityNode node) {
if (node.getType()) {
if (node.isAppType()) {
this.appPriority = node.getAppPriority();
} else {
this.requestPriority = node.getRequestPriority();
@ -91,7 +109,23 @@ public class ActivityNodeInfo {
return nodeId;
}
public void setNodeIds(List<String> nodeIds) {
this.nodeIds = nodeIds;
}
public String getAllocationRequestId() {
return allocationRequestId;
}
public String getCount() {
return count;
}
public List<String> getNodeIds() {
return nodeIds;
}
public List<ActivityNodeInfo> getChildren() {
return children;
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -59,7 +60,8 @@ public class AppActivitiesInfo {
}
public AppActivitiesInfo(List<AppAllocation> appAllocations,
ApplicationId applicationId) {
ApplicationId applicationId,
RMWSConsts.ActivitiesGroupBy groupBy) {
this.applicationId = applicationId.toString();
this.allocations = new ArrayList<>();
@ -73,7 +75,7 @@ public class AppActivitiesInfo {
for (int i = appAllocations.size() - 1; i > -1; i--) {
AppAllocation appAllocation = appAllocations.get(i);
AppAllocationInfo appAllocationInfo = new AppAllocationInfo(
appAllocation);
appAllocation, groupBy);
this.allocations.add(appAllocationInfo);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
@ -47,7 +48,8 @@ public class AppAllocationInfo {
AppAllocationInfo() {
}
AppAllocationInfo(AppAllocation allocation) {
AppAllocationInfo(AppAllocation allocation,
RMWSConsts.ActivitiesGroupBy groupBy) {
this.requestAllocation = new ArrayList<>();
this.nodeId = allocation.getNodeId();
this.queueName = allocation.getQueueName();
@ -62,7 +64,7 @@ public class AppAllocationInfo {
for (List<ActivityNode> requestActivityNodes : requestToActivityNodes
.values()) {
AppRequestAllocationInfo requestAllocationInfo =
new AppRequestAllocationInfo(requestActivityNodes);
new AppRequestAllocationInfo(requestActivityNodes, groupBy);
this.requestAllocation.add(requestAllocationInfo);
}
}

View File

@ -19,12 +19,13 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import com.google.common.collect.Iterables;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
/**
@ -41,18 +42,14 @@ public class AppRequestAllocationInfo {
AppRequestAllocationInfo() {
}
AppRequestAllocationInfo(List<ActivityNode> activityNodes) {
this.allocationAttempt = new ArrayList<>();
AppRequestAllocationInfo(List<ActivityNode> activityNodes,
RMWSConsts.ActivitiesGroupBy groupBy) {
ActivityNode lastActivityNode = Iterables.getLast(activityNodes);
this.requestPriority = lastActivityNode.getRequestPriority();
this.allocationRequestId = lastActivityNode.getAllocationRequestId();
this.allocationState = lastActivityNode.getState().name();
for (ActivityNode attempt : activityNodes) {
ActivityNodeInfo containerInfo =
new ActivityNodeInfo(attempt.getName(), attempt.getState(),
attempt.getDiagnostic(), attempt.getNodeId());
this.allocationAttempt.add(containerInfo);
}
this.allocationAttempt = ActivitiesUtils
.getRequestActivityNodeInfos(activityNodes, groupBy);
}
public String getRequestPriority() {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
@ -42,11 +43,12 @@ public class NodeAllocationInfo {
NodeAllocationInfo() {
}
NodeAllocationInfo(NodeAllocation allocation) {
NodeAllocationInfo(NodeAllocation allocation,
RMWSConsts.ActivitiesGroupBy groupBy) {
this.allocatedContainerId = allocation.getContainerId();
this.finalAllocationState = allocation.getFinalAllocationState().name();
root = new ActivityNodeInfo(allocation.getRoot());
root = new ActivityNodeInfo(allocation.getRoot(), groupBy);
}
}

View File

@ -286,14 +286,14 @@ public class TestActivitiesManager {
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
}
AppActivitiesInfo appActivitiesInfo = newActivitiesManager
.getAppActivitiesInfo(app.getApplicationId(), null, null);
.getAppActivitiesInfo(app.getApplicationId(), null, null, null);
Assert.assertEquals(numActivities,
appActivitiesInfo.getAllocations().size());
// sleep until all app activities expired
Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
// there should be no remaining app activities
appActivitiesInfo = newActivitiesManager
.getAppActivitiesInfo(app.getApplicationId(), null, null);
.getAppActivitiesInfo(app.getApplicationId(), null, null, null);
Assert.assertEquals(0,
appActivitiesInfo.getAllocations().size());
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@ -61,6 +63,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTes
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyNumberOfAllocations;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.verifyStateOfAllocations;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@ -444,4 +447,180 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
rm.stop();
}
}
@Test (timeout=30000)
public void testGroupByDiagnostics() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 2 * 1024);
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 2 * 1024);
try {
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_ACTIVITIES);
MultivaluedMapImpl params = new MultivaluedMapImpl();
/*
* test non-exist groupBy
*/
params.add(RMWSConsts.GROUP_BY, "NON-EXIST-GROUP-BY");
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
Assert.assertTrue(json.getString("diagnostic")
.startsWith("Got invalid groupBy:"));
params.remove(RMWSConsts.GROUP_BY);
/*
* test groupBy: DIAGNOSTIC
*/
params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.
DIAGNOSTIC.name().toLowerCase());
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for next allocation", json.getString("diagnostic"));
//Request a container for am2, will reserve a container on nm1
am1.allocate("*", 4096, 1, new ArrayList<>());
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
json = ActivitiesTestUtils.requestWebResource(r, params);
//Check activities
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
//Check diagnostic for request of app1
Predicate<JSONObject> findReqPred =
(obj) -> obj.optString("name").equals("request_1_-1");
List<JSONObject> reqObjs =
findInAllocations(allocationObj, findReqPred);
assertEquals(1, reqObjs.size());
JSONArray reqChildren = reqObjs.get(0).getJSONArray("children");
assertEquals(2, reqChildren.length());
for (int i = 0; i < reqChildren.length(); i++) {
JSONObject reqChild = reqChildren.getJSONObject(i);
if (reqChild.getString("allocationState")
.equals(AllocationState.SKIPPED.name())) {
assertEquals("3", reqChild.getString("count"));
assertEquals(3, reqChild.getJSONArray("nodeIds").length());
assertTrue(reqChild.optString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
} else if (reqChild.getString("allocationState")
.equals(AllocationState.RESERVED.name())) {
assertEquals("1", reqChild.getString("count"));
assertNotNull(reqChild.getString("nodeIds"));
} else {
Assert.fail("Allocation state should be "
+ AllocationState.SKIPPED.name() + " or "
+ AllocationState.RESERVED.name() + "!");
}
}
} finally {
rm.stop();
}
}
@Test (timeout=30000)
public void testAppGroupByDiagnostics() throws Exception {
rm.start();
CapacityScheduler cs = (CapacityScheduler)rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 2 * 1024);
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 2 * 1024);
try {
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
.path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
/*
* test non-exist groupBy
*/
params.add(RMWSConsts.GROUP_BY, "NON-EXIST-GROUP-BY");
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
Assert.assertTrue(json.getString("diagnostic")
.startsWith("Got invalid groupBy:"));
params.remove(RMWSConsts.GROUP_BY);
/*
* test groupBy: DIAGNOSTIC
*/
params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.
DIAGNOSTIC.name().toLowerCase());
json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display", json.getString("diagnostic"));
//Request two containers with different priority for am1
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.newInstance(0), "*",
Resources.createResource(1024), 1), ResourceRequest
.newInstance(Priority.newInstance(1), "*",
Resources.createResource(4096), 1)), null);
//Trigger scheduling, will allocate a container with priority 0
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
//Trigger scheduling, will reserve a container with priority 1 on nm1
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
json = ActivitiesTestUtils.requestWebResource(r, params);
//Check app activities
verifyNumberOfAllocations(json, 2);
JSONArray allocationArray = json.getJSONArray("allocations");
//Check first activity is for second allocation with RESERVED state
JSONObject allocationObj = allocationArray.getJSONObject(0);
verifyStateOfAllocations(allocationObj, "allocationState", "RESERVED");
JSONObject requestAllocationObj =
allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 2);
JSONArray allocationAttemptArray =
requestAllocationObj.getJSONArray("allocationAttempt");
for (int i=0; i<allocationAttemptArray.length(); i++) {
JSONObject allocationAttemptObj =
allocationAttemptArray.getJSONObject(i);
if (allocationAttemptObj.getString("allocationState")
.equals(AllocationState.SKIPPED.name())) {
assertEquals("3", allocationAttemptObj.getString("count"));
assertEquals(3,
allocationAttemptObj.getJSONArray("nodeIds").length());
assertTrue(allocationAttemptObj.optString("diagnostic")
.contains(INSUFFICIENT_RESOURCE_DIAGNOSTIC_PREFIX));
} else if (allocationAttemptObj.getString("allocationState")
.equals(AllocationState.RESERVED.name())) {
assertEquals("1", allocationAttemptObj.getString("count"));
assertNotNull(allocationAttemptObj.getString("nodeIds"));
} else {
Assert.fail("Allocation state should be "
+ AllocationState.SKIPPED.name() + " or "
+ AllocationState.RESERVED.name() + "!");
}
}
// check second activity is for first allocation with ALLOCATED state
allocationObj = allocationArray.getJSONObject(1);
verifyStateOfAllocations(allocationObj, "allocationState", "ACCEPTED");
requestAllocationObj = allocationObj.getJSONObject("requestAllocation");
verifyNumberOfAllocationAttempts(requestAllocationObj, 1);
verifyStateOfAllocations(requestAllocationObj, "allocationState",
"ALLOCATED");
JSONObject allocationAttemptObj =
requestAllocationObj.getJSONObject("allocationAttempt");
assertEquals("1", allocationAttemptObj.getString("count"));
assertNotNull(allocationAttemptObj.getString("nodeIds"));
} finally {
rm.stop();
}
}
}

View File

@ -180,7 +180,8 @@ public class DefaultRequestInterceptorREST
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy) {
// nodeId is specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
ActivitiesInfo.class, HTTPMethods.GET,
@ -191,7 +192,7 @@ public class DefaultRequestInterceptorREST
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds, String groupBy) {
// time and appId are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppActivitiesInfo.class, HTTPMethods.GET,

View File

@ -1138,14 +1138,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy) {
throw new NotImplementedException("Code is not implemented");
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds, String groupBy) {
throw new NotImplementedException("Code is not implemented");
}

View File

@ -446,10 +446,12 @@ public class RouterWebServices implements RMWebServiceProtocol {
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId) {
@QueryParam(RMWSConsts.NODEID) String nodeId,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getActivities(hsr, nodeId);
return pipeline.getRootInterceptor()
.getActivities(hsr, nodeId, groupBy);
}
@GET
@ -462,11 +464,12 @@ public class RouterWebServices implements RMWebServiceProtocol {
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time,
requestPriorities, allocationRequestIds);
requestPriorities, allocationRequestIds, groupBy);
}
@GET

View File

@ -175,13 +175,13 @@ public abstract class BaseRouterWebServicesTest {
protected ActivitiesInfo getActivities(String user)
throws IOException, InterruptedException {
return routerWebService.getActivities(
createHttpServletRequest(user), null);
createHttpServletRequest(user), null, null);
}
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
return routerWebService.getAppActivities(
createHttpServletRequest(user), null, null, null, null);
createHttpServletRequest(user), null, null, null, null, null);
}
protected ApplicationStatisticsInfo getAppStatistics(String user)

View File

@ -133,14 +133,15 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy) {
return new ActivitiesInfo();
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds, String groupBy) {
return new AppActivitiesInfo();
}

View File

@ -161,16 +161,17 @@ public class PassThroughRESTRequestInterceptor
}
@Override
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
return getNextInterceptor().getActivities(hsr, nodeId);
public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy) {
return getNextInterceptor().getActivities(hsr, nodeId, groupBy);
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds) {
Set<String> allocationRequestIds, String groupBy) {
return getNextInterceptor().getAppActivities(hsr, appId, time,
requestPriorities, allocationRequestIds);
requestPriorities, allocationRequestIds, groupBy);
}
@Override