YARN-9578. Add limit/actions/summarize options for app activities REST API. Contributed by Tao Yang.
This commit is contained in:
parent
88c53d516c
commit
970b0b0c02
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
@ -121,7 +122,8 @@ public class ActivitiesManager extends AbstractService {
|
|||
|
||||
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
|
||||
Set<String> requestPriorities, Set<String> allocationRequestIds,
|
||||
RMWSConsts.ActivitiesGroupBy groupBy) {
|
||||
RMWSConsts.ActivitiesGroupBy groupBy, int limit, boolean summarize,
|
||||
double maxTimeInSeconds) {
|
||||
RMApp app = rmContext.getRMApps().get(applicationId);
|
||||
if (app != null && app.getFinalApplicationStatus()
|
||||
== FinalApplicationStatus.UNDEFINED) {
|
||||
|
@ -140,6 +142,17 @@ public class ActivitiesManager extends AbstractService {
|
|||
allocations = new ArrayList(curAllocations);
|
||||
}
|
||||
}
|
||||
if (summarize && allocations != null) {
|
||||
AppAllocation summaryAppAllocation =
|
||||
getSummarizedAppAllocation(allocations, maxTimeInSeconds);
|
||||
if (summaryAppAllocation != null) {
|
||||
allocations = Lists.newArrayList(summaryAppAllocation);
|
||||
}
|
||||
}
|
||||
if (allocations != null && limit > 0 && limit < allocations.size()) {
|
||||
allocations =
|
||||
allocations.subList(allocations.size() - limit, allocations.size());
|
||||
}
|
||||
return new AppActivitiesInfo(allocations, applicationId, groupBy);
|
||||
} else {
|
||||
return new AppActivitiesInfo(
|
||||
|
@ -148,6 +161,47 @@ public class ActivitiesManager extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get summarized app allocation from multiple allocations as follows:
|
||||
* 1. Collect latest allocation attempts on nodes to construct an allocation
|
||||
* summary on nodes from multiple app allocations which are recorded a few
|
||||
* seconds before the last allocation.
|
||||
* 2. Copy other fields from the last allocation.
|
||||
*/
|
||||
private AppAllocation getSummarizedAppAllocation(
|
||||
List<AppAllocation> allocations, double maxTimeInSeconds) {
|
||||
if (allocations == null || allocations.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
long startTime = allocations.get(allocations.size() - 1).getTime()
|
||||
- (long) (maxTimeInSeconds * 1000);
|
||||
Map<String, ActivityNode> nodeActivities = new HashMap<>();
|
||||
for (int i = allocations.size() - 1; i >= 0; i--) {
|
||||
AppAllocation appAllocation = allocations.get(i);
|
||||
if (startTime > appAllocation.getTime()) {
|
||||
break;
|
||||
}
|
||||
List<ActivityNode> activityNodes = appAllocation.getAllocationAttempts();
|
||||
for (ActivityNode an : activityNodes) {
|
||||
if (an.getNodeId() != null) {
|
||||
nodeActivities.putIfAbsent(
|
||||
an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_"
|
||||
+ an.getNodeId(), an);
|
||||
}
|
||||
}
|
||||
}
|
||||
AppAllocation lastAppAllocation = allocations.get(allocations.size() - 1);
|
||||
AppAllocation summarizedAppAllocation =
|
||||
new AppAllocation(lastAppAllocation.getPriority(), null,
|
||||
lastAppAllocation.getQueueName());
|
||||
summarizedAppAllocation
|
||||
.updateAppContainerStateAndTime(null, lastAppAllocation.getAppState(),
|
||||
lastAppAllocation.getTime(), lastAppAllocation.getDiagnostic());
|
||||
summarizedAppAllocation
|
||||
.setAllocationAttempts(new ArrayList<>(nodeActivities.values()));
|
||||
return summarizedAppAllocation;
|
||||
}
|
||||
|
||||
public ActivitiesInfo getActivitiesInfo(String nodeId,
|
||||
RMWSConsts.ActivitiesGroupBy groupBy) {
|
||||
List<NodeAllocation> allocations;
|
||||
|
|
|
@ -84,11 +84,8 @@ public class AppAllocation {
|
|||
return appState;
|
||||
}
|
||||
|
||||
public String getPriority() {
|
||||
if (priority == null) {
|
||||
return null;
|
||||
}
|
||||
return priority.toString();
|
||||
public Priority getPriority() {
|
||||
return priority;
|
||||
}
|
||||
|
||||
public String getContainerId() {
|
||||
|
@ -128,4 +125,8 @@ public class AppAllocation {
|
|||
.collect(Collectors.toList());
|
||||
return appAllocation;
|
||||
}
|
||||
|
||||
public void setAllocationAttempts(List<ActivityNode> allocationAttempts) {
|
||||
this.allocationAttempts = allocationAttempts;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ public final class RMWSConsts {
|
|||
|
||||
/** Path for {@code RMWebServiceProtocol#getAppActivities}. */
|
||||
public static final String SCHEDULER_APP_ACTIVITIES =
|
||||
"/scheduler/app-activities";
|
||||
"/scheduler/app-activities/{appid}";
|
||||
|
||||
/** Path for {@code RMWebServiceProtocol#getAppStatistics}. */
|
||||
public static final String APP_STATISTICS = "/appstatistics";
|
||||
|
@ -237,6 +237,8 @@ public final class RMWSConsts {
|
|||
public static final String GROUP_BY = "groupBy";
|
||||
public static final String SIGNAL = "signal";
|
||||
public static final String COMMAND = "command";
|
||||
public static final String ACTIONS = "actions";
|
||||
public static final String SUMMARIZE = "summarize";
|
||||
|
||||
private RMWSConsts() {
|
||||
// not called
|
||||
|
@ -250,4 +252,13 @@ public final class RMWSConsts {
|
|||
public enum ActivitiesGroupBy {
|
||||
DIAGNOSTIC
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the required action of app activities:
|
||||
* REFRESH means to turn on activities recording for the required app,
|
||||
* GET means the required app activities should be involved in response.
|
||||
*/
|
||||
public enum AppActivitiesRequiredAction {
|
||||
REFRESH, GET
|
||||
}
|
||||
}
|
|
@ -227,11 +227,16 @@ public interface RMWebServiceProtocol {
|
|||
* the activities. It is a QueryParam.
|
||||
* @param groupBy the groupBy type by which the activities should be
|
||||
* aggregated. It is a QueryParam.
|
||||
* @param limit set a limit of the result. It is a QueryParam.
|
||||
* @param actions the required actions of app activities. It is a QueryParam.
|
||||
* @param summarize whether app activities in multiple scheduling processes
|
||||
* need to be summarized. It is a QueryParam.
|
||||
* @return all the activities about a specific app for a specific time
|
||||
*/
|
||||
AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId,
|
||||
String time, Set<String> requestPriorities,
|
||||
Set<String> allocationRequestIds, String groupBy);
|
||||
Set<String> allocationRequestIds, String groupBy, String limit,
|
||||
Set<String> actions, boolean summarize);
|
||||
|
||||
/**
|
||||
* This method retrieves all the statistics for a specific app, and it is
|
||||
|
|
|
@ -236,6 +236,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
public static final String DEFAULT_START_TIME = "0";
|
||||
public static final String DEFAULT_END_TIME = "-1";
|
||||
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
|
||||
public static final String DEFAULT_SUMMARIZE = "false";
|
||||
|
||||
@VisibleForTesting
|
||||
boolean isCentralizedNodeLabelConfiguration = true;
|
||||
|
@ -717,12 +718,16 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
|
||||
@Override
|
||||
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
|
||||
@QueryParam(RMWSConsts.APP_ID) String appId,
|
||||
@PathParam(RMWSConsts.APPID) String appId,
|
||||
@QueryParam(RMWSConsts.MAX_TIME) String time,
|
||||
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
|
||||
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
|
||||
Set<String> allocationRequestIds,
|
||||
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
|
||||
@QueryParam(RMWSConsts.GROUP_BY) String groupBy,
|
||||
@QueryParam(RMWSConsts.LIMIT) String limit,
|
||||
@QueryParam(RMWSConsts.ACTIONS) Set<String> actions,
|
||||
@QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE)
|
||||
boolean summarize) {
|
||||
initForReadableEndpoints();
|
||||
|
||||
YarnScheduler scheduler = rm.getRMContext().getScheduler();
|
||||
|
@ -749,6 +754,26 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
return new AppActivitiesInfo(e.getMessage(), appId);
|
||||
}
|
||||
|
||||
Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
|
||||
try {
|
||||
requiredActions = parseAppActivitiesRequiredActions(actions);
|
||||
} catch (IllegalArgumentException e) {
|
||||
return new AppActivitiesInfo(e.getMessage(), appId);
|
||||
}
|
||||
|
||||
int limitNum = -1;
|
||||
if (limit != null) {
|
||||
try {
|
||||
limitNum = Integer.parseInt(limit);
|
||||
if (limitNum <= 0) {
|
||||
return new AppActivitiesInfo(
|
||||
"limit must be greater than 0!", appId);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
return new AppActivitiesInfo("limit must be integer!", appId);
|
||||
}
|
||||
}
|
||||
|
||||
double maxTime = 3.0;
|
||||
|
||||
if (time != null) {
|
||||
|
@ -762,12 +787,21 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
ApplicationId applicationId;
|
||||
try {
|
||||
applicationId = ApplicationId.fromString(appId);
|
||||
activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
|
||||
AppActivitiesInfo appActivitiesInfo =
|
||||
activitiesManager.getAppActivitiesInfo(applicationId,
|
||||
requestPriorities, allocationRequestIds, activitiesGroupBy);
|
||||
|
||||
if (requiredActions
|
||||
.contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
|
||||
activitiesManager
|
||||
.turnOnAppActivitiesRecording(applicationId, maxTime);
|
||||
}
|
||||
if (requiredActions
|
||||
.contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
|
||||
AppActivitiesInfo appActivitiesInfo = activitiesManager
|
||||
.getAppActivitiesInfo(applicationId, requestPriorities,
|
||||
allocationRequestIds, activitiesGroupBy, limitNum,
|
||||
summarize, maxTime);
|
||||
return appActivitiesInfo;
|
||||
}
|
||||
return new AppActivitiesInfo("Successfully notified actions: "
|
||||
+ StringUtils.join(',', actions), appId);
|
||||
} catch (Exception e) {
|
||||
String errMessage = "Cannot find application with given appId";
|
||||
LOG.error(errMessage, e);
|
||||
|
@ -778,6 +812,29 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
|
|||
return null;
|
||||
}
|
||||
|
||||
private Set<RMWSConsts.AppActivitiesRequiredAction>
|
||||
parseAppActivitiesRequiredActions(Set<String> actions) {
|
||||
Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions =
|
||||
new HashSet<>();
|
||||
if (actions == null || actions.isEmpty()) {
|
||||
requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.REFRESH);
|
||||
requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.GET);
|
||||
} else {
|
||||
for (String action : actions) {
|
||||
if (!EnumUtils.isValidEnum(RMWSConsts.AppActivitiesRequiredAction.class,
|
||||
action.toUpperCase())) {
|
||||
String errMesasge =
|
||||
"Got invalid action: " + action + ", valid actions: " + Arrays
|
||||
.asList(RMWSConsts.AppActivitiesRequiredAction.values());
|
||||
throw new IllegalArgumentException(errMesasge);
|
||||
}
|
||||
requiredActions.add(RMWSConsts.AppActivitiesRequiredAction
|
||||
.valueOf(action.toUpperCase()));
|
||||
}
|
||||
}
|
||||
return requiredActions;
|
||||
}
|
||||
|
||||
private RMWSConsts.ActivitiesGroupBy parseActivitiesGroupBy(String groupBy) {
|
||||
if (groupBy != null) {
|
||||
if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class,
|
||||
|
|
|
@ -54,7 +54,8 @@ public class AppAllocationInfo {
|
|||
this.requestAllocation = new ArrayList<>();
|
||||
this.nodeId = allocation.getNodeId();
|
||||
this.queueName = allocation.getQueueName();
|
||||
this.appPriority = allocation.getPriority();
|
||||
this.appPriority = allocation.getPriority() == null ?
|
||||
null : allocation.getPriority().toString();
|
||||
this.timestamp = allocation.getTime();
|
||||
this.dateTime = new Date(allocation.getTime()).toString();
|
||||
this.allocationState = allocation.getAppState().name();
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
|
@ -286,18 +288,124 @@ public class TestActivitiesManager {
|
|||
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
|
||||
}
|
||||
AppActivitiesInfo appActivitiesInfo = newActivitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null);
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
|
||||
false, 3);
|
||||
Assert.assertEquals(numActivities,
|
||||
appActivitiesInfo.getAllocations().size());
|
||||
// sleep until all app activities expired
|
||||
Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
|
||||
// there should be no remaining app activities
|
||||
appActivitiesInfo = newActivitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null);
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
|
||||
false, 3);
|
||||
Assert.assertEquals(0,
|
||||
appActivitiesInfo.getAllocations().size());
|
||||
}
|
||||
|
||||
@Test (timeout = 30000)
|
||||
public void testAppActivitiesPerformance() {
|
||||
// start recording activities for first app
|
||||
SchedulerApplicationAttempt app = apps.get(0);
|
||||
FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
|
||||
activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100);
|
||||
int numActivities = 100;
|
||||
int numNodes = 10000;
|
||||
int testingTimes = 10;
|
||||
for (int ano = 0; ano < numActivities; ano++) {
|
||||
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
|
||||
SystemClock.getInstance().getTime(), app);
|
||||
for (int i = 0; i < numNodes; i++) {
|
||||
NodeId nodeId = NodeId.newInstance("host" + i, 0);
|
||||
activitiesManager
|
||||
.addSchedulingActivityForApp(app.getApplicationId(), null, "0",
|
||||
ActivityState.SKIPPED,
|
||||
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, "container",
|
||||
nodeId, "0");
|
||||
}
|
||||
ActivitiesLogger.APP
|
||||
.finishAllocatedAppAllocationRecording(activitiesManager,
|
||||
app.getApplicationId(), null, ActivityState.SKIPPED,
|
||||
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
|
||||
}
|
||||
|
||||
// It often take a longer time for the first query, ignore this distraction
|
||||
activitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
|
||||
true, 100);
|
||||
|
||||
// Test getting normal app activities
|
||||
Supplier<Void> normalSupplier = () -> {
|
||||
AppActivitiesInfo appActivitiesInfo = activitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
|
||||
false, 100);
|
||||
Assert.assertEquals(numActivities,
|
||||
appActivitiesInfo.getAllocations().size());
|
||||
Assert.assertEquals(1,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.size());
|
||||
Assert.assertEquals(numNodes,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.get(0).getAllocationAttempt().size());
|
||||
return null;
|
||||
};
|
||||
testManyTimes("Getting normal app activities", normalSupplier,
|
||||
testingTimes);
|
||||
|
||||
// Test getting aggregated app activities
|
||||
Supplier<Void> aggregatedSupplier = () -> {
|
||||
AppActivitiesInfo appActivitiesInfo = activitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null,
|
||||
RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100);
|
||||
Assert.assertEquals(numActivities,
|
||||
appActivitiesInfo.getAllocations().size());
|
||||
Assert.assertEquals(1,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.size());
|
||||
Assert.assertEquals(1,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.get(0).getAllocationAttempt().size());
|
||||
Assert.assertEquals(numNodes,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.get(0).getAllocationAttempt().get(0).getNodeIds().size());
|
||||
return null;
|
||||
};
|
||||
testManyTimes("Getting aggregated app activities", aggregatedSupplier,
|
||||
testingTimes);
|
||||
|
||||
// Test getting summarized app activities
|
||||
Supplier<Void> summarizedSupplier = () -> {
|
||||
AppActivitiesInfo appActivitiesInfo = activitiesManager
|
||||
.getAppActivitiesInfo(app.getApplicationId(), null, null,
|
||||
RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100);
|
||||
Assert.assertEquals(1, appActivitiesInfo.getAllocations().size());
|
||||
Assert.assertEquals(1,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.size());
|
||||
Assert.assertEquals(1,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.get(0).getAllocationAttempt().size());
|
||||
Assert.assertEquals(numNodes,
|
||||
appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
|
||||
.get(0).getAllocationAttempt().get(0).getNodeIds().size());
|
||||
return null;
|
||||
};
|
||||
testManyTimes("Getting summarized app activities", summarizedSupplier,
|
||||
testingTimes);
|
||||
}
|
||||
|
||||
private void testManyTimes(String testingName,
|
||||
Supplier<Void> supplier, int testingTimes) {
|
||||
long totalTime = 0;
|
||||
for (int i = 0; i < testingTimes; i++) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
supplier.get();
|
||||
totalTime += System.currentTimeMillis() - startTime;
|
||||
}
|
||||
System.out.println("#" + testingName + ", testing times : " + testingTimes
|
||||
+ ", total cost time : " + totalTime + " ms, average cost time : "
|
||||
+ (float) totalTime / testingTimes + " ms.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing activities manager which can record all history information about
|
||||
* node allocations.
|
||||
|
|
|
@ -41,6 +41,8 @@ import java.util.Arrays;
|
|||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
|
@ -209,4 +211,17 @@ public final class ActivitiesTestUtils {
|
|||
response.getType().toString());
|
||||
return response.getEntity(JSONObject.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert format using {name} (HTTP base) into %s (Java based).
|
||||
* @param format Initial format using {}.
|
||||
* @param args Arguments for the format.
|
||||
* @return New format using %s.
|
||||
*/
|
||||
public static String format(String format, Object... args) {
|
||||
Pattern p = Pattern.compile("\\{.*?}");
|
||||
Matcher m = p.matcher(format);
|
||||
String newFormat = m.replaceAll("%s");
|
||||
return String.format(newFormat, args);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -437,25 +437,17 @@ public class TestRMWebServicesSchedulerActivities
|
|||
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
|
||||
|
||||
//Get JSON
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
nm.nodeHeartbeat(true);
|
||||
Thread.sleep(5000);
|
||||
|
||||
//Get JSON
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
//Check app activities
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
@ -502,25 +494,17 @@ public class TestRMWebServicesSchedulerActivities
|
|||
10)), null);
|
||||
|
||||
//Get JSON
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
nm.nodeHeartbeat(true);
|
||||
Thread.sleep(5000);
|
||||
|
||||
//Get JSON
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
verifyNumberOfAllocations(json, 10);
|
||||
|
||||
|
@ -555,26 +539,17 @@ public class TestRMWebServicesSchedulerActivities
|
|||
10)), null);
|
||||
|
||||
//Get JSON
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
nm.nodeHeartbeat(true);
|
||||
Thread.sleep(5000);
|
||||
|
||||
//Get JSON
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 0);
|
||||
} finally {
|
||||
rm.stop();
|
||||
|
@ -590,24 +565,14 @@ public class TestRMWebServicesSchedulerActivities
|
|||
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
|
||||
|
||||
//Get JSON
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
//Get JSON
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 0);
|
||||
} finally {
|
||||
rm.stop();
|
||||
|
@ -639,49 +604,23 @@ public class TestRMWebServicesSchedulerActivities
|
|||
10)), null);
|
||||
|
||||
// Reserve new container
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
nm2.nodeHeartbeat(true);
|
||||
Thread.sleep(1000);
|
||||
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
||||
// Do a node heartbeat again without releasing container from app2
|
||||
r = resource();
|
||||
params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
nm2.nodeHeartbeat(true);
|
||||
Thread.sleep(1000);
|
||||
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 2);
|
||||
|
||||
// Finish application 2
|
||||
|
@ -693,26 +632,10 @@ public class TestRMWebServicesSchedulerActivities
|
|||
RMContainerEventType.FINISHED);
|
||||
|
||||
// Do a node heartbeat again
|
||||
r = resource();
|
||||
params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
nm2.nodeHeartbeat(true);
|
||||
Thread.sleep(1000);
|
||||
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 3);
|
||||
} finally {
|
||||
rm.stop();
|
||||
|
@ -847,15 +770,11 @@ public class TestRMWebServicesSchedulerActivities
|
|||
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
|
@ -867,14 +786,7 @@ public class TestRMWebServicesSchedulerActivities
|
|||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
JSONObject allocationObj = json.getJSONObject("allocations");
|
||||
JSONObject requestAllocationObj =
|
||||
|
@ -904,15 +816,11 @@ public class TestRMWebServicesSchedulerActivities
|
|||
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
|
@ -930,14 +838,7 @@ public class TestRMWebServicesSchedulerActivities
|
|||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
response =
|
||||
r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
JSONObject allocationObj = json.getJSONObject("allocations");
|
||||
JSONObject requestAllocationObj =
|
||||
|
@ -967,9 +868,9 @@ public class TestRMWebServicesSchedulerActivities
|
|||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("appId", app1.getApplicationId().toString());
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display",
|
||||
json.getString("diagnostic"));
|
||||
|
@ -1064,4 +965,228 @@ public class TestRMWebServicesSchedulerActivities
|
|||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAppLimit() throws Exception {
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
|
||||
try {
|
||||
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
// am1 asks for 1 * 5GB container
|
||||
am1.allocate("*", 5120, 1, new ArrayList<>());
|
||||
// trigger scheduling triple, there will be 3 app activities in cache
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
// query all app activities without limit
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 3);
|
||||
|
||||
// query all app activities with limit > 3
|
||||
params.putSingle(RMWSConsts.LIMIT, "10");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 3);
|
||||
|
||||
// query app activities with limit = 2
|
||||
params.putSingle(RMWSConsts.LIMIT, "2");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 2);
|
||||
|
||||
// query app activities with limit = 1
|
||||
params.putSingle(RMWSConsts.LIMIT, "1");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
||||
// query all app activities with invalid limit
|
||||
params.putSingle(RMWSConsts.LIMIT, "STRING");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("limit must be integer!", json.getString("diagnostic"));
|
||||
|
||||
// query all app activities with limit = 0
|
||||
params.putSingle(RMWSConsts.LIMIT, "0");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("limit must be greater than 0!",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
// query all app activities with limit < 0
|
||||
params.putSingle(RMWSConsts.LIMIT, "-3");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("limit must be greater than 0!",
|
||||
json.getString("diagnostic"));
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testAppActions() throws Exception {
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
|
||||
try {
|
||||
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
// am1 asks for 10 * 512MB container
|
||||
am1.allocate("*", 512, 10, new ArrayList<>());
|
||||
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add("maxTime", 1); //only last for 1 second
|
||||
|
||||
// testing invalid action
|
||||
params.add(RMWSConsts.ACTIONS, "get");
|
||||
params.add(RMWSConsts.ACTIONS, "invalid-action");
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertTrue(json.getString("diagnostic").startsWith("Got invalid action"));
|
||||
|
||||
/*
|
||||
* testing get action
|
||||
*/
|
||||
params.putSingle(RMWSConsts.ACTIONS, "get");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display", json.getString("diagnostic"));
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
// app activities won't be recorded
|
||||
params.putSingle(RMWSConsts.ACTIONS, "get");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display", json.getString("diagnostic"));
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
/*
|
||||
* testing update action
|
||||
*/
|
||||
params.putSingle(RMWSConsts.ACTIONS, "refresh");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("Successfully notified actions: refresh",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
Thread.sleep(1000);
|
||||
|
||||
// app activities should be recorded
|
||||
params.putSingle(RMWSConsts.ACTIONS, "get");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
Thread.sleep(1000);
|
||||
|
||||
/*
|
||||
* testing update and get actions
|
||||
*/
|
||||
params.remove(RMWSConsts.ACTIONS);
|
||||
params.add(RMWSConsts.ACTIONS, "refresh");
|
||||
params.add(RMWSConsts.ACTIONS, "get");
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
Thread.sleep(1000);
|
||||
|
||||
// more app activities should be recorded
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 2);
|
||||
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
Thread.sleep(1000);
|
||||
|
||||
// more app activities should be recorded
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 3);
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testAppSummary() throws Exception {
|
||||
rm.start();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 4 * 1024);
|
||||
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 4 * 1024);
|
||||
|
||||
try {
|
||||
RMApp app1 = rm.submitApp(5120, "app1", "user1", null, "b1");
|
||||
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display",
|
||||
json.getString("diagnostic"));
|
||||
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
// am1 asks for 1 * 5GB container
|
||||
am1.allocate(Arrays.asList(ResourceRequest
|
||||
.newInstance(Priority.newInstance(0), "*",
|
||||
Resources.createResource(5 * 1024), 1)), null);
|
||||
// trigger scheduling
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm3.getNodeId())));
|
||||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
params.add(RMWSConsts.SUMMARIZE, "true");
|
||||
params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC);
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
// verify that response contains an allocation summary for all nodes
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
JSONObject allocation = json.getJSONObject("allocations");
|
||||
JSONObject reqestAllocation =
|
||||
allocation.getJSONObject("requestAllocation");
|
||||
JSONArray attempts = reqestAllocation.getJSONArray("allocationAttempt");
|
||||
assertEquals(2, attempts.length());
|
||||
for (int i = 0; i < attempts.length(); i++) {
|
||||
JSONObject attempt = attempts.getJSONObject(i);
|
||||
if (attempt.getString("allocationState").equals("SKIPPED")) {
|
||||
JSONArray nodeIds = attempt.optJSONArray("nodeIds");
|
||||
assertEquals(2, nodeIds.length());
|
||||
} else if (attempt.getString("allocationState").equals("RESERVED")) {
|
||||
assertEquals(nm1.getNodeId().toString(),
|
||||
attempt.getString("nodeIds"));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,15 +249,11 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
|||
1)), null);
|
||||
|
||||
//Trigger recording for this app
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display", json.getString("diagnostic"));
|
||||
|
||||
//Trigger scheduling for this app
|
||||
|
@ -267,12 +263,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
|||
|
||||
//Check app activities, it should contain one allocation and
|
||||
// final allocation state is ALLOCATED
|
||||
response = r.path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/app-activities").queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
|
||||
verifyNumberOfAllocations(json, 1);
|
||||
|
||||
|
@ -382,16 +373,11 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
|||
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource();
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
|
||||
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
assertEquals("waiting for display", json.getString("diagnostic"));
|
||||
|
||||
//Request two containers with different priority for am1
|
||||
|
@ -409,14 +395,8 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
|||
cs.handle(new NodeUpdateSchedulerEvent(
|
||||
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
|
||||
|
||||
response = r.path("ws").path("v1").path("cluster").path(
|
||||
"scheduler/app-activities").queryParams(params).accept(
|
||||
MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
json = response.getEntity(JSONObject.class);
|
||||
|
||||
//Check app activities
|
||||
json = ActivitiesTestUtils.requestWebResource(r, params);
|
||||
verifyNumberOfAllocations(json, 2);
|
||||
JSONArray allocationArray = json.getJSONArray("allocations");
|
||||
//Check first activity is for second allocation with RESERVED state
|
||||
|
@ -539,9 +519,9 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
|
|||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
|
||||
.path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
|
||||
.path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
|
||||
app1.getApplicationId().toString()));
|
||||
MultivaluedMapImpl params = new MultivaluedMapImpl();
|
||||
params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
|
||||
|
||||
/*
|
||||
* test non-exist groupBy
|
||||
|
|
|
@ -192,7 +192,8 @@ public class DefaultRequestInterceptorREST
|
|||
@Override
|
||||
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
|
||||
String appId, String time, Set<String> requestPriorities,
|
||||
Set<String> allocationRequestIds, String groupBy) {
|
||||
Set<String> allocationRequestIds, String groupBy, String limit,
|
||||
Set<String> actions, boolean summarize) {
|
||||
// time and appId are specified inside hsr
|
||||
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
|
||||
AppActivitiesInfo.class, HTTPMethods.GET,
|
||||
|
|
|
@ -1146,7 +1146,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
|
|||
@Override
|
||||
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
|
||||
String appId, String time, Set<String> requestPriorities,
|
||||
Set<String> allocationRequestIds, String groupBy) {
|
||||
Set<String> allocationRequestIds, String groupBy, String limit,
|
||||
Set<String> actions, boolean summarize) {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
}
|
||||
|
||||
|
|
|
@ -95,6 +95,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE;
|
||||
|
||||
/**
|
||||
* RouterWebServices is a service that runs on each router that can be used to
|
||||
* intercept and inspect {@link RMWebServiceProtocol} messages from client to
|
||||
|
@ -465,11 +467,16 @@ public class RouterWebServices implements RMWebServiceProtocol {
|
|||
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
|
||||
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
|
||||
Set<String> allocationRequestIds,
|
||||
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
|
||||
@QueryParam(RMWSConsts.GROUP_BY) String groupBy,
|
||||
@QueryParam(RMWSConsts.LIMIT) String limit,
|
||||
@QueryParam(RMWSConsts.ACTIONS) Set<String> actions,
|
||||
@QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE)
|
||||
boolean summarize) {
|
||||
init();
|
||||
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
|
||||
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time,
|
||||
requestPriorities, allocationRequestIds, groupBy);
|
||||
requestPriorities, allocationRequestIds, groupBy, limit, actions,
|
||||
summarize);
|
||||
}
|
||||
|
||||
@GET
|
||||
|
|
|
@ -180,8 +180,8 @@ public abstract class BaseRouterWebServicesTest {
|
|||
|
||||
protected AppActivitiesInfo getAppActivities(String user)
|
||||
throws IOException, InterruptedException {
|
||||
return routerWebService.getAppActivities(
|
||||
createHttpServletRequest(user), null, null, null, null, null);
|
||||
return routerWebService.getAppActivities(createHttpServletRequest(user),
|
||||
null, null, null, null, null, null, null, false);
|
||||
}
|
||||
|
||||
protected ApplicationStatisticsInfo getAppStatistics(String user)
|
||||
|
|
|
@ -141,7 +141,8 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
|
|||
@Override
|
||||
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
|
||||
String appId, String time, Set<String> requestPriorities,
|
||||
Set<String> allocationRequestIds, String groupBy) {
|
||||
Set<String> allocationRequestIds, String groupBy, String limit,
|
||||
Set<String> actions, boolean summarize) {
|
||||
return new AppActivitiesInfo();
|
||||
}
|
||||
|
||||
|
|
|
@ -169,9 +169,11 @@ public class PassThroughRESTRequestInterceptor
|
|||
@Override
|
||||
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
|
||||
String appId, String time, Set<String> requestPriorities,
|
||||
Set<String> allocationRequestIds, String groupBy) {
|
||||
Set<String> allocationRequestIds, String groupBy, String limit,
|
||||
Set<String> actions, boolean summarize) {
|
||||
return getNextInterceptor().getAppActivities(hsr, appId, time,
|
||||
requestPriorities, allocationRequestIds, groupBy);
|
||||
requestPriorities, allocationRequestIds, groupBy, limit,
|
||||
actions, summarize);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue