YARN-10319. Record Last N Scheduler Activities from ActivitiesManager

Reviewed by Tao Yang and Adam Antal.
This commit is contained in:
Prabhu Joseph 2020-06-16 21:52:32 +05:30 committed by Prabhu Joseph
parent bfcd775381
commit 247eb0979b
15 changed files with 617 additions and 170 deletions

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -44,8 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Set;
import java.util.*;
@ -75,7 +78,7 @@ public class ActivitiesManager extends AbstractService {
appsAllocation;
@VisibleForTesting
ConcurrentMap<ApplicationId, Queue<AppAllocation>> completedAppAllocations;
private boolean recordNextAvailableNode = false;
private AtomicInteger recordCount = new AtomicInteger(0);
private List<NodeAllocation> lastAvailableNodeActivities = null;
private Thread cleanUpThread;
private long activitiesCleanupIntervalMs;
@ -86,6 +89,8 @@ public class ActivitiesManager extends AbstractService {
private final RMContext rmContext;
private volatile boolean stopped;
private ThreadLocal<DiagnosticsCollectorManager> diagnosticCollectorManager;
private volatile ConcurrentLinkedDeque<Pair<NodeId, List<NodeAllocation>>>
lastNActivities;
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
@ -102,6 +107,7 @@ public class ActivitiesManager extends AbstractService {
if (rmContext.getYarnConfiguration() != null) {
setupConfForCleanup(rmContext.getYarnConfiguration());
}
lastNActivities = new ConcurrentLinkedDeque<>();
}
private void setupConfForCleanup(Configuration conf) {
@ -215,9 +221,30 @@ public class ActivitiesManager extends AbstractService {
return new ActivitiesInfo(allocations, nodeId, groupBy);
}
public List<ActivitiesInfo> recordAndGetBulkActivitiesInfo(
int activitiesCount, RMWSConsts.ActivitiesGroupBy groupBy)
throws InterruptedException {
recordCount.set(activitiesCount);
while (recordCount.get() > 0) {
Thread.sleep(1);
}
Iterator<Pair<NodeId, List<NodeAllocation>>> ite =
lastNActivities.iterator();
List<ActivitiesInfo> outList = new ArrayList<>();
while (ite.hasNext()) {
Pair<NodeId, List<NodeAllocation>> pair = ite.next();
outList.add(new ActivitiesInfo(pair.getRight(),
pair.getLeft().toString(), groupBy));
}
// reset with new activities
lastNActivities = new ConcurrentLinkedDeque<>();
return outList;
}
public void recordNextNodeUpdateActivities(String nodeId) {
if (nodeId == null) {
recordNextAvailableNode = true;
recordCount.compareAndSet(0, 1);
} else {
activeRecordedNodes.add(NodeId.fromString(nodeId));
}
@ -348,7 +375,7 @@ public class ActivitiesManager extends AbstractService {
}
void startNodeUpdateRecording(NodeId nodeID) {
if (recordNextAvailableNode) {
if (recordCount.get() > 0) {
recordNextNodeUpdateActivities(nodeID.toString());
}
// Removing from activeRecordedNodes immediately is to ensure that
@ -470,14 +497,17 @@ public class ActivitiesManager extends AbstractService {
allocation.setTimestamp(timestamp);
allocation.setPartition(partition);
}
if (recordNextAvailableNode) {
recordNextAvailableNode = false;
if (recordCount.get() > 0) {
recordCount.getAndDecrement();
}
}
if (shouldRecordThisNode(nodeID)) {
recordingNodesAllocation.get().remove(nodeID);
completedNodeAllocations.put(nodeID, value);
if (recordCount.get() >= 0) {
lastNActivities.add(Pair.of(nodeID, value));
}
}
}
// disable diagnostic collector

View File

@ -57,7 +57,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
FairSchedulerQueueInfoList.class, AppTimeoutsInfo.class,
AppTimeoutInfo.class, ResourceInformationsInfo.class,
ActivitiesInfo.class, AppActivitiesInfo.class,
QueueAclsInfo.class, QueueAclInfo.class};
QueueAclsInfo.class, QueueAclInfo.class,
BulkActivitiesInfo.class};
// these dao classes need root unwrapping
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,

View File

@ -81,6 +81,10 @@ public final class RMWSConsts {
/** Path for {@code RMWebServiceProtocol#getActivities}. */
public static final String SCHEDULER_ACTIVITIES = "/scheduler/activities";
/** Path for {@code RMWebServiceProtocol#getBulkActivities}. */
public static final String SCHEDULER_BULK_ACTIVITIES =
"/scheduler/bulk-activities";
/** Path for {@code RMWebServiceProtocol#getAppActivities}. */
public static final String SCHEDULER_APP_ACTIVITIES =
"/scheduler/app-activities/{appid}";
@ -252,6 +256,7 @@ public final class RMWSConsts {
public static final String ACTIONS = "actions";
public static final String SUMMARIZE = "summarize";
public static final String NAME = "name";
public static final String ACTIVITIES_COUNT = "activitiesCount";
private RMWSConsts() {
// not called

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
/**
@ -212,6 +213,19 @@ public interface RMWebServiceProtocol {
ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId,
String groupBy);
/**
* This method retrieve the last n activities inside scheduler and it is
* reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
*
* @param hsr the servlet request
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
* @param activitiesCount number of activities
* @return last n activities
*/
BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) throws InterruptedException;
/**
* This method retrieves all the activities for a specific app for a specific
* period of time, and it is reachable by using

View File

@ -197,6 +197,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdat
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfigVersionInfo;
@ -242,6 +243,9 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
public static final String DEFAULT_END_TIME = "-1";
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
public static final String DEFAULT_SUMMARIZE = "false";
public static final String DEFAULT_ACTIVITIES_COUNT = "10";
public static final int MAX_ACTIVITIES_COUNT = 500;
private static final String ERROR_MSG = "Not Capacity Scheduler";
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
@ -697,76 +701,133 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
String errMessage = "";
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
errMessage = "Not Capacity Scheduler";
return new ActivitiesInfo(errMessage, nodeId);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new ActivitiesInfo(e.getMessage(), nodeId);
}
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
boolean illegalInput = false;
if (nodeList.size() == 0) {
illegalInput = true;
errMessage = "No node manager running in the cluster";
} else {
if (nodeId != null) {
String hostName = nodeId;
String portName = "";
if (nodeId.contains(":")) {
int index = nodeId.indexOf(":");
hostName = nodeId.substring(0, index);
portName = nodeId.substring(index + 1);
}
boolean correctNodeId = false;
for (FiCaSchedulerNode node : nodeList) {
if ((portName.equals("")
&& node.getRMNode().getHostName().equals(hostName))
|| (!portName.equals("")
&& node.getRMNode().getHostName().equals(hostName)
&& String.valueOf(node.getRMNode().getCommandPort())
.equals(portName))) {
correctNodeId = true;
nodeId = node.getNodeID().toString();
break;
}
}
if (!correctNodeId) {
illegalInput = true;
errMessage = "Cannot find node manager with given node id";
}
}
}
if (!illegalInput) {
activitiesManager.recordNextNodeUpdateActivities(nodeId);
return activitiesManager.getActivitiesInfo(nodeId, activitiesGroupBy);
}
// Return a activities info with error message
return new ActivitiesInfo(errMessage, nodeId);
ActivitiesManager activitiesManager = getActivitiesManager();
if (null == activitiesManager) {
return new ActivitiesInfo(ERROR_MSG, nodeId);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new ActivitiesInfo(e.getMessage(), nodeId);
}
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) rm.getRMContext().getScheduler();
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
boolean illegalInput = false;
String errMessage = "";
if (nodeList.size() == 0) {
illegalInput = true;
errMessage = "No node manager running in the cluster";
} else {
if (nodeId != null) {
String hostName = nodeId;
String portName = "";
if (nodeId.contains(":")) {
int index = nodeId.indexOf(":");
hostName = nodeId.substring(0, index);
portName = nodeId.substring(index + 1);
}
boolean correctNodeId = false;
for (FiCaSchedulerNode node : nodeList) {
if ((portName.equals("")
&& node.getRMNode().getHostName().equals(hostName))
|| (!portName.equals("")
&& node.getRMNode().getHostName().equals(hostName)
&& String.valueOf(node.getRMNode().getCommandPort())
.equals(portName))) {
correctNodeId = true;
nodeId = node.getNodeID().toString();
break;
}
}
if (!correctNodeId) {
illegalInput = true;
errMessage = "Cannot find node manager with given node id";
}
}
}
if (!illegalInput) {
activitiesManager.recordNextNodeUpdateActivities(nodeId);
return activitiesManager.getActivitiesInfo(nodeId, activitiesGroupBy);
}
// Return a activities info with error message
return new ActivitiesInfo(errMessage, nodeId);
}
@GET
@Path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public BulkActivitiesInfo getBulkActivities(
@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy,
@QueryParam(RMWSConsts.ACTIVITIES_COUNT)
@DefaultValue(DEFAULT_ACTIVITIES_COUNT) int activitiesCount)
throws InterruptedException {
initForReadableEndpoints();
ActivitiesManager activitiesManager = getActivitiesManager();
if (null == activitiesManager) {
throw new BadRequestException(ERROR_MSG);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
throw new BadRequestException(e.getMessage());
}
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) rm.getRMContext().getScheduler();
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
if (nodeList.size() == 0) {
throw new BadRequestException(
"No node manager running in the cluster");
}
if (activitiesCount <= 0) {
activitiesCount = Integer.parseInt(DEFAULT_ACTIVITIES_COUNT);
}
activitiesCount = Math.min(activitiesCount, MAX_ACTIVITIES_COUNT);
List<ActivitiesInfo> activitiesList = activitiesManager
.recordAndGetBulkActivitiesInfo(activitiesCount,
activitiesGroupBy);
BulkActivitiesInfo bulkActivitiesInfo = new
BulkActivitiesInfo();
bulkActivitiesInfo.addAll(activitiesList);
return bulkActivitiesInfo;
}
private ActivitiesManager getActivitiesManager() {
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
return activitiesManager;
}
return null;
}
@ -788,105 +849,95 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
boolean summarize) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
String errMessage = "Not Capacity Scheduler";
return new AppActivitiesInfo(errMessage, appId);
}
if (appId == null) {
String errMessage = "Must provide an application Id";
return new AppActivitiesInfo(errMessage, null);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new AppActivitiesInfo(e.getMessage(), appId);
}
Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
try {
requiredActions =
parseAppActivitiesRequiredActions(getFlatSet(actions));
} catch (IllegalArgumentException e) {
return new AppActivitiesInfo(e.getMessage(), appId);
}
Set<Integer> parsedRequestPriorities;
try {
parsedRequestPriorities = getFlatSet(requestPriorities).stream()
.map(e -> Integer.valueOf(e)).collect(Collectors.toSet());
} catch (NumberFormatException e) {
return new AppActivitiesInfo("request priorities must be integers!",
appId);
}
Set<Long> parsedAllocationRequestIds;
try {
parsedAllocationRequestIds = getFlatSet(allocationRequestIds).stream()
.map(e -> Long.valueOf(e)).collect(Collectors.toSet());
} catch (NumberFormatException e) {
return new AppActivitiesInfo(
"allocation request Ids must be integers!", appId);
}
int limitNum = -1;
if (limit != null) {
try {
limitNum = Integer.parseInt(limit);
if (limitNum <= 0) {
return new AppActivitiesInfo(
"limit must be greater than 0!", appId);
}
} catch (NumberFormatException e) {
return new AppActivitiesInfo("limit must be integer!", appId);
}
}
double maxTime = 3.0;
if (time != null) {
if (time.contains(".")) {
maxTime = Double.parseDouble(time);
} else {
maxTime = Double.parseDouble(time + ".0");
}
}
ApplicationId applicationId;
try {
applicationId = ApplicationId.fromString(appId);
if (requiredActions
.contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
activitiesManager
.turnOnAppActivitiesRecording(applicationId, maxTime);
}
if (requiredActions
.contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
AppActivitiesInfo appActivitiesInfo = activitiesManager
.getAppActivitiesInfo(applicationId, parsedRequestPriorities,
parsedAllocationRequestIds, activitiesGroupBy, limitNum,
summarize, maxTime);
return appActivitiesInfo;
}
return new AppActivitiesInfo("Successfully received "
+ (actions.size() == 1 ? "action: " : "actions: ")
+ StringUtils.join(',', actions), appId);
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
LOG.error(errMessage, e);
return new AppActivitiesInfo(errMessage, appId);
}
ActivitiesManager activitiesManager = getActivitiesManager();
if (null == activitiesManager) {
return new AppActivitiesInfo(ERROR_MSG, appId);
}
if (appId == null) {
String errMessage = "Must provide an application Id";
return new AppActivitiesInfo(errMessage, null);
}
RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
try {
activitiesGroupBy = parseActivitiesGroupBy(groupBy);
} catch (IllegalArgumentException e) {
return new AppActivitiesInfo(e.getMessage(), appId);
}
Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
try {
requiredActions =
parseAppActivitiesRequiredActions(getFlatSet(actions));
} catch (IllegalArgumentException e) {
return new AppActivitiesInfo(e.getMessage(), appId);
}
Set<Integer> parsedRequestPriorities;
try {
parsedRequestPriorities = getFlatSet(requestPriorities).stream()
.map(e -> Integer.valueOf(e)).collect(Collectors.toSet());
} catch (NumberFormatException e) {
return new AppActivitiesInfo("request priorities must be integers!",
appId);
}
Set<Long> parsedAllocationRequestIds;
try {
parsedAllocationRequestIds = getFlatSet(allocationRequestIds).stream()
.map(e -> Long.valueOf(e)).collect(Collectors.toSet());
} catch (NumberFormatException e) {
return new AppActivitiesInfo(
"allocation request Ids must be integers!", appId);
}
int limitNum = -1;
if (limit != null) {
try {
limitNum = Integer.parseInt(limit);
if (limitNum <= 0) {
return new AppActivitiesInfo(
"limit must be greater than 0!", appId);
}
} catch (NumberFormatException e) {
return new AppActivitiesInfo("limit must be integer!", appId);
}
}
double maxTime = 3.0;
if (time != null) {
if (time.contains(".")) {
maxTime = Double.parseDouble(time);
} else {
maxTime = Double.parseDouble(time + ".0");
}
}
ApplicationId applicationId;
try {
applicationId = ApplicationId.fromString(appId);
if (requiredActions
.contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
activitiesManager
.turnOnAppActivitiesRecording(applicationId, maxTime);
}
if (requiredActions
.contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
AppActivitiesInfo appActivitiesInfo = activitiesManager
.getAppActivitiesInfo(applicationId, parsedRequestPriorities,
parsedAllocationRequestIds, activitiesGroupBy, limitNum,
summarize, maxTime);
return appActivitiesInfo;
}
return new AppActivitiesInfo("Successfully received "
+ (actions.size() == 1 ? "action: " : "actions: ")
+ StringUtils.join(',', actions), appId);
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
LOG.error(errMessage, e);
return new AppActivitiesInfo(errMessage, appId);
}
return null;
}
private Set<String> getFlatSet(Set<String> set) {

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.List;
import java.util.ArrayList;
/**
* DAO object to display allocation activities.
*/
@XmlRootElement(name = "bulkActivities")
@XmlAccessorType(XmlAccessType.FIELD)
public class BulkActivitiesInfo {
private ArrayList<ActivitiesInfo> activities = new ArrayList<>();
public BulkActivitiesInfo() {
// JAXB needs this
}
public void add(ActivitiesInfo activitiesInfo) {
activities.add(activitiesInfo);
}
public ArrayList<ActivitiesInfo> getActivities() {
return activities;
}
public void addAll(List<ActivitiesInfo> activitiesInfoList) {
activities.addAll(activitiesInfoList);
}
}

View File

@ -99,6 +99,8 @@ public final class ActivitiesTestUtils {
public static final String FN_SCHEDULER_ACT_ROOT = "activities";
public static final String FN_SCHEDULER_BULK_ACT_ROOT = "bulkActivities";
private ActivitiesTestUtils(){}
public static List<JSONObject> findInAllocations(JSONObject allocationObj,

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
@ -72,6 +73,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTes
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_NAME;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ALLOCATIONS_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_BULK_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.TOTAL_RESOURCE_INSUFFICIENT_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.getFirstSubNodeFromJson;
@ -1586,4 +1588,102 @@ public class TestRMWebServicesSchedulerActivities
rm.stop();
}
}
@Test(timeout=30000)
public void testSchedulerBulkActivities() throws Exception {
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
MockNM[] nms = new MockNM[] {nm1, nm2};
try {
// Validate if response has 5 node activities
int expectedCount = 5;
RESTClient restClient = new RESTClient(5);
restClient.start();
sendHeartbeat(restClient, nms);
JSONObject activitiesJson = restClient.getOutput().getJSONObject(
FN_SCHEDULER_BULK_ACT_ROOT);
Object activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
assertEquals("Number of activities is wrong", expectedCount,
((JSONArray) activities).length());
// Validate if response does not exceed max 500
expectedCount = 1000;
restClient = new RESTClient(expectedCount);
restClient.start();
sendHeartbeat(restClient, nms);
activitiesJson = restClient.getOutput().getJSONObject(
FN_SCHEDULER_BULK_ACT_ROOT);
activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
assertEquals("Max Activities Limit does not work",
RMWebServices.MAX_ACTIVITIES_COUNT,
((JSONArray) activities).length());
} finally {
rm.stop();
}
}
private class RESTClient extends Thread {
private int expectedCount;
private boolean done = false;
private JSONObject json;
RESTClient(int expectedCount) {
this.expectedCount = expectedCount;
}
boolean isDone() {
return done;
}
JSONObject getOutput() {
return json;
}
@Override
public void run() {
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add(RMWSConsts.ACTIVITIES_COUNT, expectedCount);
ClientResponse response = r.path("ws").path("v1").path("cluster")
.path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES).queryParams(params)
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; "
+ JettyUtils.UTF_8, response.getType().toString());
json = response.getEntity(JSONObject.class);
done = true;
}
}
private void sendHeartbeat(RESTClient restClient, MockNM[] nms)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
}
} catch (Exception e) {
return false;
}
return restClient.isDone();
}, 10, 20000);
}
}

View File

@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -199,6 +200,15 @@ public class DefaultRequestInterceptorREST
null, getConf());
}
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) {
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
BulkActivitiesInfo.class, HTTPMethods.GET,
RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_BULK_ACTIVITIES,
null, null, getConf());
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
@ -1147,6 +1148,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
throw new NotImplementedException("Code is not implemented");
}
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) throws InterruptedException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -95,6 +96,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_ACTIVITIES_COUNT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE;
/**
@ -457,6 +459,23 @@ public class RouterWebServices implements RMWebServiceProtocol {
.getActivities(hsr, nodeId, groupBy);
}
@GET
@Path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public BulkActivitiesInfo getBulkActivities(
@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy,
@QueryParam(RMWSConsts.ACTIVITIES_COUNT)
@DefaultValue(DEFAULT_ACTIVITIES_COUNT) int activitiesCount)
throws InterruptedException {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getBulkActivities(hsr, groupBy,
activitiesCount);
}
@GET
@Path(RMWSConsts.SCHEDULER_APP_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices.RequestInterceptorChainWrapper;
@ -179,6 +180,12 @@ public abstract class BaseRouterWebServicesTest {
createHttpServletRequest(user), null, null);
}
protected BulkActivitiesInfo getBulkActivities(String user)
throws InterruptedException {
return routerWebService.getBulkActivities(
createHttpServletRequest(user), null, 0);
}
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
return routerWebService.getAppActivities(createHttpServletRequest(user),

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -138,6 +139,12 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
return new ActivitiesInfo();
}
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) throws InterruptedException{
return new BulkActivitiesInfo();
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@ -166,6 +167,13 @@ public class PassThroughRESTRequestInterceptor
return getNextInterceptor().getActivities(hsr, nodeId, groupBy);
}
@Override
public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
String groupBy, int activitiesCount) throws InterruptedException {
return getNextInterceptor().getBulkActivities(hsr, groupBy,
activitiesCount);
}
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,

View File

@ -5746,6 +5746,140 @@ Response Body:
```
Scheduler Bulk Activities API
--------------------------------
The scheduler bulk activities RESTful API can fetch scheduler activities info recorded for multiple scheduling cycle. This may take time
to return as it internally waits until a certain amount of records are generated specified by activitiesCount.
### URI
* http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities
### HTTP Operations Supported
* GET
### Query Parameters Supported
Multiple parameters can be specified for GET operations.
* activitiesCount - number of schecduling cycle to record with maximum of 500.
* groupBy - aggregation type of application activities, currently only support "diagnostic" with which
user can query aggregated activities grouped by allocation state and diagnostic.
### Response Examples
**JSON response**
HTTP Request:
Accept: application/json
GET http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities?activitiesCount=2
Response Header:
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
Server: Jetty(6.1.26)
Response Body:
Following is an output example with query parameter activitiesCount set to 2. This fetches scheduler activities info
recorded in last two scheduling cycle.
```json
{
"bulkActivities": {
"activities": [
{
"nodeId": "127.0.0.1:1234",
"timestamp": 1593684431432,
"dateTime": "Thu Jul 02 10:07:11 UTC 2020",
"allocations": [
{
"partition": "",
"finalAllocationState": "SKIPPED",
"root": {
"name": "root",
"allocationState": "SKIPPED",
"diagnostic": "Queue does not need more resource"
}
}
]
},
{
"nodeId": "127.0.0.2:1234",
"timestamp": 1593684431432,
"dateTime": "Thu Jul 02 10:07:11 UTC 2020",
"allocations": [
{
"partition": "",
"finalAllocationState": "SKIPPED",
"root": {
"name": "root",
"allocationState": "SKIPPED",
"diagnostic": "Queue does not need more resource"
}
}
]
}
]
}
}
```
**XML response**
HTTP Request:
Accept: application/xml
GET http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities?activitiesCount=2
Response Header:
HTTP/1.1 200 OK
Content-Type: application/xml; charset=utf-8
Transfer-Encoding: chunked
Response Body:
```xml
<bulkActivities>
<activities>
<nodeId>127.0.0.1:1234</nodeId>
<timestamp>1593683816380</timestamp>
<dateTime>Thu Jul 02 09:56:56 UTC 2020</dateTime>
<allocations>
<partition/>
<finalAllocationState>SKIPPED</finalAllocationState>
<root>
<name>root</name>
<allocationState>SKIPPED</allocationState>
<diagnostic>Queue does not need more resource</diagnostic>
</root>
</allocations>
</activities>
<activities>
<nodeId>127.0.0.2:1234</nodeId>
<timestamp>1593683816385</timestamp>
<dateTime>Thu Jul 02 09:56:56 UTC 2020</dateTime>
<allocations>
<partition/>
<finalAllocationState>SKIPPED</finalAllocationState>
<root>
<name>root</name>
<allocationState>SKIPPED</allocationState>
<diagnostic>Queue does not need more resource</diagnostic>
</root>
</allocations>
</activities>
</bulkActivities>
```
Application Activities API
--------------------------------