YARN-4091. Add REST API to retrieve scheduler activity. (Chen Ge and Sunil G via wangda)

This commit is contained in:
Wangda Tan 2016-08-05 10:27:34 -07:00
parent d9a354c2f3
commit e0d131f055
28 changed files with 2768 additions and 73 deletions

View File

@ -519,6 +519,7 @@
<Or>
<Field name="rmContext" />
<Field name="applications" />
<Field name="activitiesManager" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
@ -552,4 +553,15 @@
<Package name="org.apache.hadoop.yarn.api.records.impl.pb" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivityNodeInfo"/>
<Or>
<Field name="allocationState" />
<Field name="diagnostic" />
<Field name="name" />
<Field name="requestPriority" />
<Field name="appPriority" />
</Or>
<Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD" />
</Match>
</FindBugsFilter>

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@ -97,6 +98,8 @@ public abstract class AbstractYarnScheduler
private volatile Priority maxClusterLevelAppPriority;
protected ActivitiesManager activitiesManager;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
* concurrent version of 'applications' map.
@ -789,4 +792,9 @@ public abstract class AbstractYarnScheduler
}
return schedulerChangeRequests;
}
public ActivitiesManager getActivitiesManager() {
return this.activitiesManager;
}
}

View File

@ -0,0 +1,275 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
/**
* Utility for logging scheduler activities
*/
public class ActivitiesLogger {
private static final Log LOG = LogFactory.getLog(ActivitiesLogger.class);
/**
* Methods for recording activities from an app
*/
public static class APP {
/*
* Record skipped application activity when no container allocated /
* reserved / re-reserved. Scheduler will look at following applications
* within the same leaf queue.
*/
public static void recordSkippedAppActivityWithoutAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
recordAppActivityWithoutAllocation(activitiesManager, node, application,
priority, diagnostic, ActivityState.SKIPPED);
}
/*
* Record application activity when rejected because of queue maximum
* capacity or user limit.
*/
public static void recordRejectedAppActivityFromLeafQueue(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
String type = "app";
recordActivity(activitiesManager, node, application.getQueueName(),
application.getApplicationId().toString(), priority,
ActivityState.REJECTED, diagnostic, type);
finishSkippedAppAllocationRecording(activitiesManager,
application.getApplicationId(), ActivityState.REJECTED, diagnostic);
}
/*
* Record application activity when no container allocated /
* reserved / re-reserved. Scheduler will look at following applications
* within the same leaf queue.
*/
public static void recordAppActivityWithoutAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) {
if (activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app";
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY, type);
}
// Add application-container activity into specific application allocation
// Under this condition, it fails to allocate a container to this
// application, so containerId is null.
if (activitiesManager.shouldRecordThisApp(
application.getApplicationId())) {
String type = "container";
activitiesManager.addSchedulingActivityForApp(
application.getApplicationId(), null, priority.toString(), appState,
diagnostic, type);
}
}
/*
* Record application activity when container allocated / reserved /
* re-reserved
*/
public static void recordAppActivityWithAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Container updatedContainer,
ActivityState activityState) {
if (activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
String type = "container";
// Add application-container activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
application.getApplicationId().toString(),
updatedContainer.getId().toString(),
updatedContainer.getPriority().toString(), activityState,
ActivityDiagnosticConstant.EMPTY, type);
type = "app";
// Add queue-application activity into specific node allocation.
activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY, type);
}
// Add application-container activity into specific application allocation
if (activitiesManager.shouldRecordThisApp(
application.getApplicationId())) {
String type = "container";
activitiesManager.addSchedulingActivityForApp(
application.getApplicationId(), updatedContainer.getId().toString(),
updatedContainer.getPriority().toString(), activityState,
ActivityDiagnosticConstant.EMPTY, type);
}
}
/*
* Invoked when scheduler starts to look at this application within one node
* update.
*/
public static void startAppAllocationRecording(
ActivitiesManager activitiesManager, NodeId nodeId, long currentTime,
SchedulerApplicationAttempt application) {
if (activitiesManager == null) {
return;
}
activitiesManager.startAppAllocationRecording(nodeId, currentTime,
application);
}
/*
* Invoked when scheduler finishes looking at this application within one
* node update, and the app has any container allocated/reserved during
* this allocation.
*/
public static void finishAllocatedAppAllocationRecording(
ActivitiesManager activitiesManager, ApplicationId applicationId,
ContainerId containerId, ActivityState containerState,
String diagnostic) {
if (activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisApp(applicationId)) {
activitiesManager.finishAppAllocationRecording(applicationId,
containerId, containerState, diagnostic);
}
}
/*
* Invoked when scheduler finishes looking at this application within one
* node update, and the app DOESN'T have any container allocated/reserved
* during this allocation.
*/
public static void finishSkippedAppAllocationRecording(
ActivitiesManager activitiesManager, ApplicationId applicationId,
ActivityState containerState, String diagnostic) {
finishAllocatedAppAllocationRecording(activitiesManager, applicationId,
null, containerState, diagnostic);
}
}
/**
* Methods for recording activities from a queue
*/
public static class QUEUE {
/*
* Record activities of a queue
*/
public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) {
recordActivity(activitiesManager, node, parentQueueName, queueName, null,
state, diagnostic, null);
}
}
/**
* Methods for recording overall activities from one node update
*/
public static class NODE {
/*
* Invoked when node allocation finishes, and there's NO container
* allocated or reserved during the allocation
*/
public static void finishSkippedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node) {
finishAllocatedNodeAllocation(activitiesManager, node, null,
AllocationState.SKIPPED);
}
/*
* Invoked when node allocation finishes, and there's any container
* allocated or reserved during the allocation
*/
public static void finishAllocatedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
ContainerId containerId, AllocationState containerState) {
if (activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
activitiesManager.updateAllocationFinalState(node.getNodeID(),
containerId, containerState);
}
}
/*
* Invoked when node heartbeat finishes
*/
public static void finishNodeUpdateRecording(
ActivitiesManager activitiesManager, NodeId nodeID) {
if (activitiesManager == null) {
return;
}
activitiesManager.finishNodeUpdateRecording(nodeID);
}
/*
* Invoked when node heartbeat starts
*/
public static void startNodeUpdateRecording(
ActivitiesManager activitiesManager, NodeId nodeID) {
if (activitiesManager == null) {
return;
}
activitiesManager.startNodeUpdateRecording(nodeID);
}
}
// Add queue, application or container activity into specific node allocation.
private static void recordActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentName, String childName,
Priority priority, ActivityState state, String diagnostic, String type) {
if (activitiesManager == null) {
return;
}
if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
activitiesManager.addSchedulingActivityForNode(node.getNodeID(),
parentName, childName, priority != null ? priority.toString() : null,
state, diagnostic, type);
}
}
}

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.ArrayList;
/**
* A class to store node or application allocations.
* It mainly contains operations for allocation start, add, update and finish.
*/
public class ActivitiesManager extends AbstractService {
private static final Log LOG = LogFactory.getLog(ActivitiesManager.class);
private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
private Set<NodeId> activeRecordedNodes;
private ConcurrentMap<ApplicationId, Long>
recordingAppActivitiesUntilSpecifiedTime;
private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
private ConcurrentMap<ApplicationId, List<AppAllocation>>
completedAppAllocations;
private boolean recordNextAvailableNode = false;
private List<NodeAllocation> lastAvailableNodeActivities = null;
private Thread cleanUpThread;
private int timeThreshold = 600 * 1000;
private final RMContext rmContext;
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
recordingNodesAllocation = new ConcurrentHashMap<>();
completedNodeAllocations = new ConcurrentHashMap<>();
appsAllocation = new ConcurrentHashMap<>();
completedAppAllocations = new ConcurrentHashMap<>();
activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
this.rmContext = rmContext;
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
List<AppAllocation> allocations = completedAppAllocations.get(
applicationId);
return new AppActivitiesInfo(allocations, applicationId);
} else {
return new AppActivitiesInfo(
"fail to get application activities after finished",
applicationId.toString());
}
}
public ActivitiesInfo getActivitiesInfo(String nodeId) {
List<NodeAllocation> allocations;
if (nodeId == null) {
allocations = lastAvailableNodeActivities;
} else {
allocations = completedNodeAllocations.get(NodeId.fromString(nodeId));
}
return new ActivitiesInfo(allocations, nodeId);
}
public void recordNextNodeUpdateActivities(String nodeId) {
if (nodeId == null) {
recordNextAvailableNode = true;
} else {
activeRecordedNodes.add(NodeId.fromString(nodeId));
}
}
public void turnOnAppActivitiesRecording(ApplicationId applicationId,
double maxTime) {
long startTS = SystemClock.getInstance().getTime();
long endTS = startTS + (long) (maxTime * 1000);
recordingAppActivitiesUntilSpecifiedTime.put(applicationId, endTS);
}
@Override
protected void serviceStart() throws Exception {
cleanUpThread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
completedNodeAllocations.entrySet().iterator();
while (ite.hasNext()) {
Map.Entry<NodeId, List<NodeAllocation>> nodeAllocation = ite.next();
List<NodeAllocation> allocations = nodeAllocation.getValue();
long currTS = SystemClock.getInstance().getTime();
if (allocations.size() > 0 && allocations.get(0).getTimeStamp()
- currTS > timeThreshold) {
ite.remove();
}
}
Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
completedAppAllocations.entrySet().iterator();
while (iteApp.hasNext()) {
Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
iteApp.next();
if (rmContext.getRMApps().get(appAllocation.getKey())
.getFinalApplicationStatus()
!= FinalApplicationStatus.UNDEFINED) {
iteApp.remove();
}
}
try {
Thread.sleep(5000);
} catch (Exception e) {
// ignore
}
}
}
});
cleanUpThread.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
cleanUpThread.interrupt();
super.serviceStop();
}
void startNodeUpdateRecording(NodeId nodeID) {
if (recordNextAvailableNode) {
recordNextNodeUpdateActivities(nodeID.toString());
}
if (activeRecordedNodes.contains(nodeID)) {
List<NodeAllocation> nodeAllocation = new ArrayList<>();
recordingNodesAllocation.put(nodeID, nodeAllocation);
}
}
void startAppAllocationRecording(NodeId nodeID, long currTS,
SchedulerApplicationAttempt application) {
ApplicationId applicationId = application.getApplicationId();
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
> currTS) {
appsAllocation.put(applicationId,
new AppAllocation(application.getPriority(), nodeID,
application.getQueueName()));
}
if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
<= currTS) {
turnOffActivityMonitoringForApp(applicationId);
}
}
// Add queue, application or container activity into specific node allocation.
void addSchedulingActivityForNode(NodeId nodeID, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
if (shouldRecordThisNode(nodeID)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
}
}
// Add queue, application or container activity into specific application
// allocation.
void addSchedulingActivityForApp(ApplicationId applicationId,
String containerId, String priority, ActivityState state,
String diagnostic, String type) {
if (shouldRecordThisApp(applicationId)) {
AppAllocation appAllocation = appsAllocation.get(applicationId);
appAllocation.addAppAllocationActivity(containerId, priority, state,
diagnostic, type);
}
}
// Update container allocation meta status for this node allocation.
// It updates general container status but not the detailed activity state
// in updateActivityState.
void updateAllocationFinalState(NodeId nodeID, ContainerId containerId,
AllocationState containerState) {
if (shouldRecordThisNode(nodeID)) {
NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeID);
nodeAllocation.updateContainerState(containerId, containerState);
}
}
void finishAppAllocationRecording(ApplicationId applicationId,
ContainerId containerId, ActivityState appState, String diagnostic) {
if (shouldRecordThisApp(applicationId)) {
long currTS = SystemClock.getInstance().getTime();
AppAllocation appAllocation = appsAllocation.remove(applicationId);
appAllocation.updateAppContainerStateAndTime(containerId, appState,
currTS, diagnostic);
List<AppAllocation> appAllocations;
if (completedAppAllocations.containsKey(applicationId)) {
appAllocations = completedAppAllocations.get(applicationId);
} else {
appAllocations = new ArrayList<>();
completedAppAllocations.put(applicationId, appAllocations);
}
if (appAllocations.size() == 1000) {
appAllocations.remove(0);
}
appAllocations.add(appAllocation);
if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
<= currTS) {
turnOffActivityMonitoringForApp(applicationId);
}
}
}
void finishNodeUpdateRecording(NodeId nodeID) {
List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
long timeStamp = SystemClock.getInstance().getTime();
if (value != null) {
if (value.size() > 0) {
lastAvailableNodeActivities = value;
for (NodeAllocation allocation : lastAvailableNodeActivities) {
allocation.transformToTree();
allocation.setTimeStamp(timeStamp);
}
if (recordNextAvailableNode) {
recordNextAvailableNode = false;
}
}
if (shouldRecordThisNode(nodeID)) {
recordingNodesAllocation.remove(nodeID);
completedNodeAllocations.put(nodeID, value);
stopRecordNodeUpdateActivities(nodeID);
}
}
}
boolean shouldRecordThisApp(ApplicationId applicationId) {
return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
&& appsAllocation.containsKey(applicationId);
}
boolean shouldRecordThisNode(NodeId nodeID) {
return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
.containsKey(nodeID);
}
private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
NodeAllocation nodeAllocation;
// When this node has already stored allocation activities, get the
// last allocation for this node.
if (nodeAllocations.size() != 0) {
nodeAllocation = nodeAllocations.get(nodeAllocations.size() - 1);
// When final state in last allocation is not DEFAULT, it means
// last allocation has finished. Create a new allocation for this node,
// and add it to the allocation list. Return this new allocation.
//
// When final state in last allocation is DEFAULT,
// it means last allocation has not finished. Just get last allocation.
if (nodeAllocation.getFinalAllocationState() != AllocationState.DEFAULT) {
nodeAllocation = new NodeAllocation(nodeID);
nodeAllocations.add(nodeAllocation);
}
}
// When this node has not stored allocation activities,
// create a new allocation for this node, and add it to the allocation list.
// Return this new allocation.
else {
nodeAllocation = new NodeAllocation(nodeID);
nodeAllocations.add(nodeAllocation);
}
return nodeAllocation;
}
private void stopRecordNodeUpdateActivities(NodeId nodeId) {
activeRecordedNodes.remove(nodeId);
}
private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
/*
* Collection of diagnostics.
*/
public class ActivityDiagnosticConstant {
// EMPTY means it does not have any diagnostic to display.
// In order not to show "diagnostic" line in frontend,
// we set the value to null.
public final static String EMPTY = null;
public final static String NOT_ABLE_TO_ACCESS_PARTITION =
"Not able to access partition";
public final static String QUEUE_DO_NOT_NEED_MORE_RESOURCE =
"Queue does not need more resource";
public final static String QUEUE_MAX_CAPACITY_LIMIT =
"Hit queue max-capacity limit";
public final static String USER_CAPACITY_MAXIMUM_LIMIT =
"Hit user capacity maximum limit";
public final static String SKIP_BLACK_LISTED_NODE = "Skip black listed node";
public final static String PRIORITY_SKIPPED = "Priority skipped";
public final static String PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST =
"Priority skipped because off-switch request is null";
public final static String
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST =
"Priority skipped because partition of node doesn't match request";
public final static String SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY =
"Priority skipped because of relax locality is not allowed";
public final static String SKIP_IN_IGNORE_EXCLUSIVITY_MODE =
"Skipping assigning to Node in Ignore Exclusivity mode";
public final static String DO_NOT_NEED_ALLOCATIONATTEMPTINFOS =
"Doesn't need containers based on reservation algo!";
public final static String QUEUE_SKIPPED_HEADROOM =
"Queue skipped because of headroom";
public final static String NON_PARTITIONED_PARTITION_FIRST =
"Non-partitioned resource request should be scheduled to "
+ "non-partitioned partition first";
public final static String SKIP_NODE_LOCAL_REQUEST =
"Skip node-local request";
public final static String SKIP_RACK_LOCAL_REQUEST =
"Skip rack-local request";
public final static String SKIP_OFF_SWITCH_REQUEST =
"Skip offswitch request";
public final static String REQUEST_CAN_NOT_ACCESS_NODE_LABEL =
"Resource request can not access the label";
public final static String NOT_SUFFICIENT_RESOURCE =
"Node does not have sufficient resource for request";
public final static String LOCALITY_SKIPPED = "Locality skipped";
public final static String FAIL_TO_ALLOCATE = "Fail to allocate";
public final static String COULD_NOT_GET_CONTAINER =
"Couldn't get container for allocation";
public final static String APPLICATION_DO_NOT_NEED_RESOURCE =
"Application does not need more resource";
public final static String APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE =
"Application priority does not need more resource";
public final static String SKIPPED_ALL_PRIORITIES =
"All priorities are skipped of the app";
public final static String RESPECT_FIFO = "To respect FIFO of applications, "
+ "skipped following applications in the queue";
}

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import java.util.LinkedList;
import java.util.List;
/*
* It represents tree node in "NodeAllocation" tree structure.
* Each node may represent queue, application or container in allocation activity.
* Node may have children node if successfully allocated to next level.
*/
public class ActivityNode {
private String activityNodeName;
private String parentName;
private String appPriority;
private String requestPriority;
private ActivityState state;
private String diagnostic;
private List<ActivityNode> childNode;
public ActivityNode(String activityNodeName, String parentName,
String priority, ActivityState state, String diagnostic, String type) {
this.activityNodeName = activityNodeName;
this.parentName = parentName;
if (type != null) {
if (type.equals("app")) {
this.appPriority = priority;
} else if (type.equals("container")) {
this.requestPriority = priority;
}
}
this.state = state;
this.diagnostic = diagnostic;
this.childNode = new LinkedList<>();
}
public String getName() {
return this.activityNodeName;
}
public String getParentName() {
return this.parentName;
}
public void addChild(ActivityNode node) {
childNode.add(0, node);
}
public List<ActivityNode> getChildren() {
return this.childNode;
}
public ActivityState getState() {
return this.state;
}
public String getDiagnostic() {
return this.diagnostic;
}
public String getAppPriority() {
return appPriority;
}
public String getRequestPriority() {
return requestPriority;
}
public boolean getType() {
if (appPriority != null) {
return true;
} else {
return false;
}
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.activityNodeName + " ");
sb.append(this.appPriority + " ");
sb.append(this.state + " ");
if (!this.diagnostic.equals("")) {
sb.append(this.diagnostic + "\n");
}
sb.append("\n");
for (ActivityNode child : childNode) {
sb.append(child.toString() + "\n");
}
return sb.toString();
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
/*
* Collection of activity operation states.
*/
public enum ActivityState {
// default state when adding a new activity in node allocation
DEFAULT,
// container is allocated to sub-queues/applications or this queue/application
ACCEPTED,
// queue or application voluntarily give up to use the resource OR
// nothing allocated
SKIPPED,
// container could not be allocated to sub-queues or this application
REJECTED,
ALLOCATED, // successfully allocate a new non-reserved container
RESERVED, // successfully reserve a new container
RE_RESERVED // successfully reserve a new container
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/*
* It records an activity operation in allocation,
* which can be classified as queue, application or container activity.
* Other information include state, diagnostic, priority.
*/
public class AllocationActivity {
private String childName = null;
private String parentName = null;
private String appPriority = null;
private String requestPriority = null;
private ActivityState state;
private String diagnostic = null;
private static final Log LOG = LogFactory.getLog(AllocationActivity.class);
public AllocationActivity(String parentName, String queueName,
String priority, ActivityState state, String diagnostic, String type) {
this.childName = queueName;
this.parentName = parentName;
if (type != null) {
if (type.equals("app")) {
this.appPriority = priority;
} else if (type.equals("container")) {
this.requestPriority = priority;
}
}
this.state = state;
this.diagnostic = diagnostic;
}
public ActivityNode createTreeNode() {
if (appPriority != null) {
return new ActivityNode(this.childName, this.parentName, this.appPriority,
this.state, this.diagnostic, "app");
} else if (requestPriority != null) {
return new ActivityNode(this.childName, this.parentName,
this.requestPriority, this.state, this.diagnostic, "container");
} else {
return new ActivityNode(this.childName, this.parentName, null, this.state,
this.diagnostic, null);
}
}
public String getName() {
return this.childName;
}
public String getState() {
return this.state.toString();
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
/*
* Collection of allocation final states.
*/
public enum AllocationState {
DEFAULT,
// queue or application voluntarily give up to use the resource
// OR nothing allocated
SKIPPED,
// successfully allocate a new non-reserved container
ALLOCATED,
// successfully allocate a new container from an existing reserved container
ALLOCATED_FROM_RESERVED,
// successfully reserve a new container
RESERVED
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import java.util.ArrayList;
import java.util.List;
/*
* It contains allocation information for one application within a period of
* time.
* Each application allocation may have several allocation attempts.
*/
public class AppAllocation {
private Priority priority = null;
private NodeId nodeId;
private ContainerId containerId = null;
private ActivityState appState = null;
private String diagnostic = null;
private String queueName = null;
private List<ActivityNode> allocationAttempts;
private long timestamp;
public AppAllocation(Priority priority, NodeId nodeId, String queueName) {
this.priority = priority;
this.nodeId = nodeId;
this.allocationAttempts = new ArrayList<>();
this.queueName = queueName;
}
public void updateAppContainerStateAndTime(ContainerId containerId,
ActivityState appState, long ts, String diagnostic) {
this.timestamp = ts;
this.containerId = containerId;
this.appState = appState;
this.diagnostic = diagnostic;
}
public void addAppAllocationActivity(String containerId, String priority,
ActivityState state, String diagnostic, String type) {
ActivityNode container = new ActivityNode(containerId, null, priority,
state, diagnostic, type);
this.allocationAttempts.add(container);
if (state == ActivityState.REJECTED) {
this.appState = ActivityState.SKIPPED;
} else {
this.appState = state;
}
}
public String getNodeId() {
return nodeId.toString();
}
public String getQueueName() {
return queueName;
}
public ActivityState getAppState() {
return appState;
}
public String getPriority() {
if (priority == null) {
return null;
}
return priority.toString();
}
public String getContainerId() {
if (containerId == null) {
return null;
}
return containerId.toString();
}
public String getDiagnostic() {
return diagnostic;
}
public long getTime() {
return this.timestamp;
}
public List<ActivityNode> getAllocationAttempts() {
return allocationAttempts;
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/*
* It contains allocation information for one allocation in a node heartbeat.
* Detailed allocation activities are first stored in "AllocationActivity"
* as operations, then transformed to a tree structure.
* Tree structure starts from root queue and ends in leaf queue,
* application or container allocation.
*/
public class NodeAllocation {
private NodeId nodeId;
private long timeStamp;
private ContainerId containerId = null;
private AllocationState containerState = AllocationState.DEFAULT;
private List<AllocationActivity> allocationOperations;
private ActivityNode root = null;
private static final Log LOG = LogFactory.getLog(NodeAllocation.class);
public NodeAllocation(NodeId nodeId) {
this.nodeId = nodeId;
this.allocationOperations = new ArrayList<>();
}
public void addAllocationActivity(String parentName, String childName,
String priority, ActivityState state, String diagnostic, String type) {
AllocationActivity allocate = new AllocationActivity(parentName, childName,
priority, state, diagnostic, type);
this.allocationOperations.add(allocate);
}
public void updateContainerState(ContainerId containerId,
AllocationState containerState) {
this.containerId = containerId;
this.containerState = containerState;
}
// In node allocation, transform each activity to a tree-like structure
// for frontend activity display.
// eg: root
// / \
// a b
// / \
// app1 app2
// / \
// CA1 CA2
// CA means Container Attempt
public void transformToTree() {
List<ActivityNode> allocationTree = new ArrayList<>();
if (root == null) {
Set<String> names = Collections.newSetFromMap(new ConcurrentHashMap<>());
ListIterator<AllocationActivity> ite = allocationOperations.listIterator(
allocationOperations.size());
while (ite.hasPrevious()) {
String name = ite.previous().getName();
if (name != null) {
if (!names.contains(name)) {
names.add(name);
} else {
ite.remove();
}
}
}
for (AllocationActivity allocationOperation : allocationOperations) {
ActivityNode node = allocationOperation.createTreeNode();
String name = node.getName();
for (int i = allocationTree.size() - 1; i > -1; i--) {
if (allocationTree.get(i).getParentName().equals(name)) {
node.addChild(allocationTree.get(i));
allocationTree.remove(i);
} else {
break;
}
}
allocationTree.add(node);
}
root = allocationTree.get(0);
}
}
public void setTimeStamp(long timeStamp) {
this.timeStamp = timeStamp;
}
public long getTimeStamp() {
return this.timeStamp;
}
public AllocationState getFinalAllocationState() {
return containerState;
}
public String getContainerId() {
if (containerId == null)
return null;
return containerId.toString();
}
public ActivityNode getRoot() {
return root;
}
public String getNodeId() {
return nodeId.toString();
}
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -91,12 +92,15 @@ public abstract class AbstractCSQueue implements CSQueue {
protected CapacitySchedulerContext csContext;
protected YarnAuthorizationProvider authorizer = null;
public AbstractCSQueue(CapacitySchedulerContext cs,
protected ActivitiesManager activitiesManager;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
this.resourceCalculator = cs.getResourceCalculator();
this.activitiesManager = cs.getActivitiesManager();
// must be called after parent and queueName is set
this.metrics =

View File

@ -92,22 +92,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueInvalidException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
@ -307,6 +297,8 @@ public class CapacityScheduler extends
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
@ -344,6 +336,7 @@ public class CapacityScheduler extends
@Override
public void serviceStart() throws Exception {
startSchedulerThreads();
activitiesManager.start();
super.serviceStart();
}
@ -523,7 +516,7 @@ public class CapacityScheduler extends
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
newQueues, queues, noop);
newQueues, queues, noop);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
@ -650,7 +643,7 @@ public class CapacityScheduler extends
throw new IllegalStateException(
"Only Leaf Queues can be reservable for " + queueName);
}
ParentQueue parentQueue =
ParentQueue parentQueue =
new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
// Used only for unit tests
@ -802,7 +795,7 @@ public class CapacityScheduler extends
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
application.getPriority(), isAttemptRecovering);
application.getPriority(), isAttemptRecovering, activitiesManager);
if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt());
@ -1233,6 +1226,7 @@ public class CapacityScheduler extends
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication =
getCurrentAttemptForContainer(reservedContainer.getContainerId());
@ -1262,6 +1256,19 @@ public class CapacityScheduler extends
tmp.getAssignmentInformation().incrAllocations();
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(),
AllocationState.ALLOCATED_FROM_RESERVED);
} else {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
queue.getParent().getQueueName(), queue.getQueueName(),
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
node, reservedContainer.getContainerId(), AllocationState.SKIPPED);
}
}
@ -1371,7 +1378,11 @@ public class CapacityScheduler extends
setLastNodeUpdateTime(Time.now());
nodeUpdate(node);
if (!scheduleAsynchronously) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
node.getNodeID());
allocateContainersToNode(getNode(node.getNodeID()));
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
node.getNodeID());
}
}
break;

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
@ -80,4 +81,6 @@ public interface CapacitySchedulerContext {
* cluster.
*/
ResourceUsage getClusterResourceUsage();
ActivitiesManager getActivitiesManager();
}

View File

@ -19,15 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -65,9 +57,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -75,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderi
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@ -135,7 +130,7 @@ public class LeafQueue extends AbstractCSQueue {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics);
this.activeUsersManager = new ActiveUsersManager(metrics);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
@ -144,7 +139,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
}
setupQueueConfigs(cs.getClusterResource());
}
@ -862,7 +857,7 @@ public class LeafQueue extends AbstractCSQueue {
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
@ -881,6 +876,10 @@ public class LeafQueue extends AbstractCSQueue {
if (reservedContainer != null) {
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node.getNodeID(), SystemClock.getInstance().getTime(), application);
synchronized (application) {
CSAssignment assignment =
application.assignContainers(clusterResource, node,
@ -895,6 +894,10 @@ public class LeafQueue extends AbstractCSQueue {
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
.getPartition());
return CSAssignment.NULL_ASSIGNMENT;
}
@ -907,6 +910,9 @@ public class LeafQueue extends AbstractCSQueue {
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
return CSAssignment.NULL_ASSIGNMENT;
}
@ -914,13 +920,23 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
node.getNodeID(), SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
currentResourceLimits, application.getCurrentReservation(),
schedulingMode)) {
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node,
application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
node.getPartition(), schedulingMode);
@ -930,6 +946,10 @@ public class LeafQueue extends AbstractCSQueue {
application, node.getPartition(), currentResourceLimits)) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
activitiesManager, node,
application, application.getPriority(),
ActivityDiagnosticConstant.USER_CAPACITY_MAXIMUM_LIMIT);
continue;
}
@ -971,10 +991,17 @@ public class LeafQueue extends AbstractCSQueue {
incReservedResource(node.getPartition(), reservedRes);
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY);
// Done
return assignment;
} else if (assignment.getSkippedType()
== CSAssignment.SkippedType.OTHER) {
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
} else if(assignment.getSkippedType()
== CSAssignment.SkippedType.QUEUE_LIMIT) {
@ -982,9 +1009,18 @@ public class LeafQueue extends AbstractCSQueue {
} else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.RESPECT_FIFO);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParent().getQueueName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}

View File

@ -42,12 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -81,7 +80,7 @@ public class ParentQueue extends AbstractCSQueue {
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public ParentQueue(CapacitySchedulerContext cs,
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
@ -98,14 +97,14 @@ public class ParentQueue extends AbstractCSQueue {
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
setupQueueConfigs(cs.getClusterResource());
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
}
synchronized void setupQueueConfigs(Resource clusterResource)
@ -380,6 +379,10 @@ public class ParentQueue extends AbstractCSQueue {
" #applications: " + getNumApplications());
}
private String getParentName() {
return getParent() != null ? getParent().getQueueName() : "";
}
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits,
@ -392,6 +395,16 @@ public class ParentQueue extends AbstractCSQueue {
+ ", because it is not able to access partition=" + node
.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.REJECTED,
ActivityDiagnosticConstant.NOT_ABLE_TO_ACCESS_PARTITION + node
.getPartition());
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
return CSAssignment.NULL_ASSIGNMENT;
}
@ -404,6 +417,15 @@ public class ParentQueue extends AbstractCSQueue {
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-partition=" + node.getPartition());
}
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
return CSAssignment.NULL_ASSIGNMENT;
}
@ -423,9 +445,18 @@ public class ParentQueue extends AbstractCSQueue {
resourceLimits, Resources.createResource(
getMetrics().getReservedMB(), getMetrics()
.getReservedVirtualCores()), schedulingMode)) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_MAX_CAPACITY_LIMIT);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
break;
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, resourceLimits,
@ -436,6 +467,29 @@ public class ParentQueue extends AbstractCSQueue {
if (Resources.greaterThan(
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY);
if (node.getReservedContainer() == null) {
if (rootQueue) {
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
activitiesManager, node,
assignedToChild.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId(),
AllocationState.ALLOCATED);
}
} else {
if (rootQueue) {
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(
activitiesManager, node,
assignedToChild.getAssignmentInformation()
.getFirstAllocatedOrReservedContainerId(),
AllocationState.RESERVED);
}
}
// Track resource utilization for the parent-queue
allocateResource(clusterResource, assignedToChild.getResource(),
node.getPartition(), assignedToChild.isIncreasedAllocation());
@ -474,6 +528,15 @@ public class ParentQueue extends AbstractCSQueue {
} else {
assignment.setSkippedType(assignedToChild.getSkippedType());
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
getParentName(), getQueueName(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.EMPTY);
if (rootQueue) {
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
node);
}
break;
}
@ -631,7 +694,7 @@ public class ParentQueue extends AbstractCSQueue {
resourceToSubtract);
}
}
return assignment;
}

View File

@ -24,6 +24,10 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -43,17 +47,25 @@ public abstract class AbstractContainerAllocator {
FiCaSchedulerApp application;
final ResourceCalculator rc;
final RMContext rmContext;
ActivitiesManager activitiesManager;
public AbstractContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
this(application, rc, rmContext, null);
}
public AbstractContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext,
ActivitiesManager activitiesManager) {
this.application = application;
this.rc = rc;
this.rmContext = rmContext;
this.activitiesManager = activitiesManager;
}
protected CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result,
RMContainer rmContainer) {
RMContainer rmContainer, FiCaSchedulerNode node) {
// Handle skipped
CSAssignment.SkippedType skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED) ?
@ -61,7 +73,7 @@ public abstract class AbstractContainerAllocator {
CSAssignment.SkippedType.NONE;
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);
// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());
@ -85,6 +97,23 @@ public abstract class AbstractContainerAllocator {
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
if (rmContainer != null) {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RE_RESERVED);
ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
} else {
ActivitiesLogger.APP.recordAppActivityWithAllocation(
activitiesManager, node, application, updatedContainer,
ActivityState.RESERVED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getId(), ActivityState.RESERVED,
ActivityDiagnosticConstant.EMPTY);
}
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
// This is a new container
// Inform the ordering policy
@ -105,10 +134,18 @@ public abstract class AbstractContainerAllocator {
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);
if (rmContainer != null) {
assignment.setFulfilledReservation(true);
}
ActivitiesLogger.APP.recordAppActivityWithAllocation(activitiesManager,
node, application, updatedContainer, ActivityState.ALLOCATED);
ActivitiesLogger.APP.finishAllocatedAppAllocationRecording(
activitiesManager, application.getApplicationId(),
updatedContainer.getId(), ActivityState.ACCEPTED,
ActivityDiagnosticConstant.EMPTY);
}
assignment.setContainersToKill(result.getToKillContainers());
@ -118,13 +155,13 @@ public abstract class AbstractContainerAllocator {
CSAssignment.SkippedType.QUEUE_LIMIT);
}
}
return assignment;
}
/**
* allocate needs to handle following stuffs:
*
*
* <ul>
* <li>Select request: Select a request to allocate. E.g. select a resource
* request based on requirement/priority/locality.</li>

View File

@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -36,12 +37,17 @@ public class ContainerAllocator extends AbstractContainerAllocator {
public ContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
this(application, rc, rmContext, null);
}
public ContainerAllocator(FiCaSchedulerApp application, ResourceCalculator rc,
RMContext rmContext, ActivitiesManager activitiesManager) {
super(application, rc, rmContext);
increaseContainerAllocator =
new IncreaseContainerAllocator(application, rc, rmContext);
regularContainerAllocator =
new RegularContainerAllocator(application, rc, rmContext);
regularContainerAllocator = new RegularContainerAllocator(application, rc,
rmContext, activitiesManager);
}
@Override

View File

@ -24,11 +24,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@ -37,6 +39,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestK
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@ -57,10 +65,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
private static final Log LOG = LogFactory.getLog(RegularContainerAllocator.class);
private ResourceRequest lastResourceRequest = null;
public RegularContainerAllocator(FiCaSchedulerApp application,
ResourceCalculator rc, RMContext rmContext) {
super(application, rc, rmContext);
ResourceCalculator rc, RMContext rmContext,
ActivitiesManager activitiesManager) {
super(application, rc, rmContext, activitiesManager);
}
private boolean checkHeadroom(Resource clusterResource,
@ -85,15 +94,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
Priority priority = schedulerKey.getPriority();
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_BLACK_LISTED_NODE);
return ContainerAllocation.APP_SKIPPED;
}
ResourceRequest anyRequest =
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (null == anyRequest) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -102,6 +119,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(schedulerKey) <= 0) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.APPLICATION_PRIORITY_DO_NOT_NEED_RESOURCE);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -116,6 +136,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
application.updateAppSkipNodeDiagnostics(
"Skipping assigning to Node in Ignore Exclusivity mode. ");
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_IN_IGNORE_EXCLUSIVITY_MODE);
return ContainerAllocation.APP_SKIPPED;
}
}
@ -126,6 +149,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
anyRequest.getNodeLabelExpression(), node.getPartition(),
schedulingMode)) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.
PRIORITY_SKIPPED_BECAUSE_NODE_PARTITION_DOES_NOT_MATCH_REQUEST);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -134,6 +161,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.DO_NOT_NEED_ALLOCATIONATTEMPTINFOS);
return ContainerAllocation.PRIORITY_SKIPPED;
}
}
@ -143,6 +173,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
LOG.debug("cannot allocate required resource=" + required
+ " because of headroom");
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.QUEUE_SKIPPED_HEADROOM);
return ContainerAllocation.QUEUE_SKIPPED;
}
@ -174,7 +207,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
+ missedNonPartitionedRequestSchedulingOpportunity + " required="
+ rmContext.getScheduler().getNumClusterNodes());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.NON_PARTITIONED_PARTITION_FIRST);
return ContainerAllocation.APP_SKIPPED;
}
}
@ -301,6 +336,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
// Skip node-local request, go to rack-local request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_NODE_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -316,6 +354,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
}
// Skip rack-local request, go to off-switch request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_RACK_LOCAL_REQUEST);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -332,6 +373,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application.updateAppSkipNodeDiagnostics(
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_DUE_TO_LOCALITY);
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.SKIP_OFF_SWITCH_REQUEST);
return ContainerAllocation.APP_SKIPPED;
}
@ -339,6 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
RMContainer reservedContainer, SchedulingMode schedulingMode,
ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
ContainerAllocation allocation;
@ -364,6 +409,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application.getResourceRequest(schedulerKey, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -387,6 +435,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.SKIP_PRIORITY_BECAUSE_OF_RELAX_LOCALITY);
return ContainerAllocation.PRIORITY_SKIPPED;
}
if (requestType != NodeType.NODE_LOCAL
@ -408,7 +459,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return allocation;
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.PRIORITY_SKIPPED);
return ContainerAllocation.PRIORITY_SKIPPED;
}
@ -416,6 +469,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
ResourceRequest request, NodeType type, RMContainer rmContainer,
SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
Priority priority = schedulerKey.getPriority();
lastResourceRequest = request;
if (LOG.isDebugEnabled()) {
@ -432,6 +486,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, priority,
ActivityDiagnosticConstant.REQUEST_CAN_NOT_ACCESS_NODE_LABEL,
ActivityState.REJECTED);
return new ContainerAllocation(rmContainer, null,
AllocationState.LOCALITY_SKIPPED);
}
@ -446,6 +504,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
// Skip this locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.NOT_SUFFICIENT_RESOURCE);
return ContainerAllocation.LOCALITY_SKIPPED;
}
@ -524,6 +585,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
// continue.
if (null == unreservedContainer) {
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -548,6 +612,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
LOG.debug("we needed to unreserve to be able to allocate");
}
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -560,6 +627,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
return result;
}
// Skip the locality request
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, priority,
ActivityDiagnosticConstant.LOCALITY_SKIPPED);
return ContainerAllocation.LOCALITY_SKIPPED;
}
}
@ -636,6 +706,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
ContainerAllocation ret =
new ContainerAllocation(allocationResult.containerToBeUnreserved,
null, AllocationState.APP_SKIPPED);
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, ActivityState.REJECTED);
return ret;
}
@ -662,6 +735,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
application
.updateAppSkipNodeDiagnostics("Scheduling of container failed. ");
LOG.warn("Couldn't get container for allocation!");
ActivitiesLogger.APP.recordAppActivityWithoutAllocation(activitiesManager,
node, application, schedulerKey.getPriority(),
ActivityDiagnosticConstant.COULD_NOT_GET_CONTAINER,
ActivityState.REJECTED);
return ContainerAllocation.APP_SKIPPED;
}
@ -741,6 +818,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
+ ", because it doesn't need more resource, schedulingMode="
+ schedulingMode.name() + " node-label=" + node.getPartition());
}
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.APPLICATION_DO_NOT_NEED_RESOURCE);
return CSAssignment.SKIP_ASSIGNMENT;
}
@ -755,18 +835,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
continue;
}
return getCSAssignmentFromAllocateResult(clusterResource, result,
null);
null, node);
}
// We will reach here if we skipped all priorities of the app, so we will
// skip the app.
ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
activitiesManager, node, application, application.getPriority(),
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
return CSAssignment.SKIP_ASSIGNMENT;
} else {
ContainerAllocation result =
allocate(clusterResource, node, schedulingMode, resourceLimits,
reservedContainer.getReservedSchedulerKey(), reservedContainer);
return getCSAssignmentFromAllocateResult(clusterResource, result,
reservedContainer);
reservedContainer, node);
}
}
}

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
@ -108,8 +109,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
appPriority, isAttemptRecovering, null);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
Resource amResource;
@ -139,8 +148,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if (scheduler.getResourceCalculator() != null) {
rc = scheduler.getResourceCalculator();
}
containerAllocator = new ContainerAllocator(this, rc, rmContext);
containerAllocator = new ContainerAllocator(this, rc, rmContext,
activitiesManager);
if (scheduler instanceof CapacityScheduler) {
capacitySchedulerContext = (CapacitySchedulerContext) scheduler;
@ -189,7 +199,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return null;
}
// Required sanity check - AM can call 'allocate' to update resource
// Required sanity check - AM can call 'allocate' to update resource
// request without locking the scheduler, hence we need to check
if (getTotalRequiredResources(schedulerKey) <= 0) {
return null;
@ -493,7 +503,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
public LeafQueue getCSLeafQueue() {
return (LeafQueue)queue;
}
public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode, RMContainer reservedContainer) {

View File

@ -130,9 +130,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
@ -176,6 +180,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.webapp.WebServices;
@ -576,6 +581,124 @@ public class RMWebServices extends WebServices {
return allApps;
}
@GET
@Path("/scheduler/activities")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam("nodeId") String nodeId) {
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
String errMessage = "";
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
errMessage = "Not Capacity Scheduler";
return new ActivitiesInfo(errMessage, nodeId);
}
List<FiCaSchedulerNode> nodeList =
abstractYarnScheduler.getNodeTracker().getAllNodes();
boolean illegalInput = false;
if (nodeList.size() == 0) {
illegalInput = true;
errMessage = "No node manager running in the cluster";
} else {
if (nodeId != null) {
String hostName = nodeId;
String portName = "";
if (nodeId.contains(":")) {
int index = nodeId.indexOf(":");
hostName = nodeId.substring(0, index);
portName = nodeId.substring(index + 1);
}
boolean correctNodeId = false;
for (FiCaSchedulerNode node : nodeList) {
if ((portName.equals("") && node.getRMNode().getHostName().equals(
hostName)) || (!portName.equals("") && node.getRMNode()
.getHostName().equals(hostName) && String.valueOf(
node.getRMNode().getCommandPort()).equals(portName))) {
correctNodeId = true;
nodeId = node.getNodeID().toString();
break;
}
}
if (!correctNodeId) {
illegalInput = true;
errMessage = "Cannot find node manager with given node id";
}
}
}
if (!illegalInput) {
activitiesManager.recordNextNodeUpdateActivities(nodeId);
return activitiesManager.getActivitiesInfo(nodeId);
}
// Return a activities info with error message
return new ActivitiesInfo(errMessage, nodeId);
}
return null;
}
@GET
@Path("/scheduler/app-activities")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
@QueryParam("appId") String appId, @QueryParam("maxTime") String time) {
YarnScheduler scheduler = rm.getRMContext().getScheduler();
if (scheduler instanceof AbstractYarnScheduler) {
AbstractYarnScheduler abstractYarnScheduler =
(AbstractYarnScheduler) scheduler;
ActivitiesManager activitiesManager =
abstractYarnScheduler.getActivitiesManager();
if (null == activitiesManager) {
String errMessage = "Not Capacity Scheduler";
return new AppActivitiesInfo(errMessage, appId);
}
if(appId == null) {
String errMessage = "Must provide an application Id";
return new AppActivitiesInfo(errMessage, null);
}
double maxTime = 3.0;
if (time != null) {
if (time.contains(".")) {
maxTime = Double.parseDouble(time);
} else {
maxTime = Double.parseDouble(time + ".0");
}
}
ApplicationId applicationId;
try {
applicationId = ApplicationId.fromString(appId);
activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
AppActivitiesInfo appActivitiesInfo =
activitiesManager.getAppActivitiesInfo(applicationId);
return appActivitiesInfo;
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
return new AppActivitiesInfo(errMessage, appId);
}
}
return null;
}
@GET
@Path("/appstatistics")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.Date;
import java.util.List;
import java.util.ArrayList;
/*
* DAO object to display node allocation activity.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ActivitiesInfo {
protected String nodeId;
protected String timeStamp;
protected String diagnostic = null;
protected List<NodeAllocationInfo> allocations;
private static final Log LOG = LogFactory.getLog(ActivitiesInfo.class);
public ActivitiesInfo() {
}
public ActivitiesInfo(String errorMessage, String nodeId) {
this.diagnostic = errorMessage;
this.nodeId = nodeId;
}
public ActivitiesInfo(List<NodeAllocation> nodeAllocations, String nodeId) {
this.nodeId = nodeId;
this.allocations = new ArrayList<>();
if (nodeAllocations == null) {
diagnostic = (nodeId != null ?
"waiting for display" :
"waiting for next allocation");
} else {
if (nodeAllocations.size() == 0) {
diagnostic = "do not have available resources";
} else {
this.nodeId = nodeAllocations.get(0).getNodeId();
Date date = new Date();
date.setTime(nodeAllocations.get(0).getTimeStamp());
this.timeStamp = date.toString();
for (int i = 0; i < nodeAllocations.size(); i++) {
NodeAllocation nodeAllocation = nodeAllocations.get(i);
NodeAllocationInfo allocationInfo = new NodeAllocationInfo(
nodeAllocation);
this.allocations.add(allocationInfo);
}
}
}
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
/*
* DAO object to display node information in allocation tree.
* It corresponds to "ActivityNode" class.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class ActivityNodeInfo {
protected String name; // The name for activity node
protected String appPriority;
protected String requestPriority;
protected String allocationState;
protected String diagnostic;
protected List<ActivityNodeInfo> children;
ActivityNodeInfo() {
}
ActivityNodeInfo(ActivityNode node) {
this.name = node.getName();
getPriority(node);
this.allocationState = node.getState().name();
this.diagnostic = node.getDiagnostic();
this.children = new ArrayList<>();
for (ActivityNode child : node.getChildren()) {
ActivityNodeInfo containerInfo = new ActivityNodeInfo(child);
this.children.add(containerInfo);
}
}
private void getPriority(ActivityNode node) {
if (node.getType()) {
this.appPriority = node.getAppPriority();
} else {
this.requestPriority = node.getRequestPriority();
}
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
import org.apache.hadoop.yarn.util.SystemClock;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/*
* DAO object to display application activity.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class AppActivitiesInfo {
protected String applicationId;
protected String diagnostic;
protected String timeStamp;
protected List<AppAllocationInfo> allocations;
private static final Log LOG = LogFactory.getLog(AppActivitiesInfo.class);
public AppActivitiesInfo() {
}
public AppActivitiesInfo(String errorMessage, String applicationId) {
this.diagnostic = errorMessage;
this.applicationId = applicationId;
Date date = new Date();
date.setTime(SystemClock.getInstance().getTime());
this.timeStamp = date.toString();
}
public AppActivitiesInfo(List<AppAllocation> appAllocations,
ApplicationId applicationId) {
this.applicationId = applicationId.toString();
this.allocations = new ArrayList<>();
if (appAllocations == null) {
diagnostic = "waiting for display";
Date date = new Date();
date.setTime(SystemClock.getInstance().getTime());
this.timeStamp = date.toString();
} else {
for (int i = appAllocations.size() - 1; i > -1; i--) {
AppAllocation appAllocation = appAllocations.get(i);
AppAllocationInfo appAllocationInfo = new AppAllocationInfo(
appAllocation);
this.allocations.add(appAllocationInfo);
}
}
}
}

View File

@ -0,0 +1,72 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AppAllocation;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/*
* DAO object to display application allocation detailed information.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class AppAllocationInfo {
protected String nodeId;
protected String queueName;
protected String appPriority;
protected String allocatedContainerId;
protected String allocationState;
protected String diagnostic;
protected String timeStamp;
protected List<ActivityNodeInfo> allocationAttempt;
private static final Log LOG = LogFactory.getLog(AppAllocationInfo.class);
AppAllocationInfo() {
}
AppAllocationInfo(AppAllocation allocation) {
this.allocationAttempt = new ArrayList<>();
this.nodeId = allocation.getNodeId();
this.queueName = allocation.getQueueName();
this.appPriority = allocation.getPriority();
this.allocatedContainerId = allocation.getContainerId();
this.allocationState = allocation.getAppState().name();
this.diagnostic = allocation.getDiagnostic();
Date date = new Date();
date.setTime(allocation.getTime());
this.timeStamp = date.toString();
for (ActivityNode attempt : allocation.getAllocationAttempts()) {
ActivityNodeInfo containerInfo = new ActivityNodeInfo(attempt);
this.allocationAttempt.add(containerInfo);
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.NodeAllocation;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlRootElement;
/*
* DAO object to display each node allocation in node heartbeat.
*/
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class NodeAllocationInfo {
protected String allocatedContainerId;
protected String finalAllocationState;
protected ActivityNodeInfo root = null;
private static final Log LOG = LogFactory.getLog(NodeAllocationInfo.class);
NodeAllocationInfo() {
}
NodeAllocationInfo(NodeAllocation allocation) {
this.allocatedContainerId = allocation.getContainerId();
this.finalAllocationState = allocation.getFinalAllocationState().name();
root = new ActivityNodeInfo(allocation.getRoot());
}
}

View File

@ -62,9 +62,9 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
public class TestRMWebServicesCapacitySched extends JerseyTestBase {
private static MockRM rm;
private static CapacitySchedulerConfiguration csConf;
private static YarnConfiguration conf;
protected static MockRM rm;
protected static CapacitySchedulerConfiguration csConf;
protected static YarnConfiguration conf;
private class QueueInfo {
float capacity;

View File

@ -0,0 +1,777 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Test;
import javax.ws.rs.core.MediaType;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class TestRMWebServicesSchedulerActivities
extends TestRMWebServicesCapacitySched {
private static final Log LOG = LogFactory.getLog(
TestRMWebServicesSchedulerActivities.class);
@Test
public void testAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1:1234");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 11);
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
if (i != allocations.length() - 1) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"finalAllocationState", "ALLOCATED");
verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
} else {
verifyStateOfAllocations(allocations.getJSONObject(i),
"finalAllocationState", "SKIPPED");
verifyQueueOrder(allocations.getJSONObject(i), "root-a-b");
}
}
}
finally {
rm.stop();
}
}
@Test
public void testAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1:1234");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testWrongNodeId() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.0");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b3-b1");
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState", "RESERVED");
// Do a node heartbeat again without releasing container from app2
r = resource();
params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "b1");
allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState", "SKIPPED");
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
r = resource();
params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.2");
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
verifyQueueOrder(json.getJSONObject("allocations"), "b1");
allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED_FROM_RESERVED");
}
finally {
rm.stop();
}
}
@Test
public void testActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("nodeId", "127.0.0.1");
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(1000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "finalAllocationState",
"ALLOCATED");
verifyNumberOfNodes(allocations, 6);
verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1");
}
finally {
rm.stop();
}
}
private void verifyNumberOfNodes(JSONObject allocation, int realValue)
throws Exception {
if (allocation.isNull("root")) {
assertEquals("State of allocation is wrong", 0, realValue);
} else {
assertEquals("State of allocation is wrong",
1 + getNumberOfNodes(allocation.getJSONObject("root")), realValue);
}
}
private int getNumberOfNodes(JSONObject allocation) throws Exception {
if (!allocation.isNull("children")) {
Object object = allocation.get("children");
if (object.getClass() == JSONObject.class) {
return 1 + getNumberOfNodes((JSONObject) object);
} else {
int count = 0;
for (int i = 0; i < ((JSONArray) object).length(); i++) {
count += (1 + getNumberOfNodes(
((JSONArray) object).getJSONObject(i)));
}
return count;
}
} else {
return 0;
}
}
private void verifyStateOfAllocations(JSONObject allocation,
String nameToCheck, String realState) throws Exception {
assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
realState);
}
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
assertEquals("Number of allocations is wrong", 0, realValue);
} else {
Object object = json.get("allocations");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations is wrong",
((JSONArray) object).length(), realValue);
}
}
}
private void verifyQueueOrder(JSONObject json, String realOrder)
throws Exception {
String order = "";
if (!json.isNull("root")) {
JSONObject root = json.getJSONObject("root");
order = root.getString("name") + "-" + getQueueOrder(root);
}
assertEquals("Order of queue is wrong",
order.substring(0, order.length() - 1), realOrder);
}
private String getQueueOrder(JSONObject node) throws Exception {
if (!node.isNull("children")) {
Object children = node.get("children");
if (children.getClass() == JSONObject.class) {
if (!((JSONObject) children).isNull("appPriority")) {
return "";
}
return ((JSONObject) children).getString("name") + "-" + getQueueOrder(
(JSONObject) children);
} else if (children.getClass() == JSONArray.class) {
String order = "";
for (int i = 0; i < ((JSONArray) children).length(); i++) {
JSONObject child = (JSONObject) ((JSONArray) children).get(i);
if (!child.isNull("appPriority")) {
return "";
}
order += (child.getString("name") + "-" + getQueueOrder(child));
}
return order;
}
}
return "";
}
private void verifyNumberOfAllocationAttempts(JSONObject allocation,
int realValue) throws Exception {
if (allocation.isNull("allocationAttempt")) {
assertEquals("Number of allocation attempts is wrong", 0, realValue);
} else {
Object object = allocation.get("allocationAttempt");
if (object.getClass() == JSONObject.class) {
assertEquals("Number of allocations attempts is wrong", 1, realValue);
} else if (object.getClass() == JSONArray.class) {
assertEquals("Number of allocations attempts is wrong",
((JSONArray) object).length(), realValue);
}
}
}
@Test
public void testAppActivityJSON() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
JSONObject allocations = json.getJSONObject("allocations");
verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
verifyNumberOfAllocationAttempts(allocations, 1);
}
finally {
rm.stop();
}
}
@Test
public void testAppAssignMultipleContainersPerNodeHeartbeat()
throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 24 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 10);
JSONArray allocations = json.getJSONArray("allocations");
for (int i = 0; i < allocations.length(); i++) {
verifyStateOfAllocations(allocations.getJSONObject(i),
"allocationState", "ACCEPTED");
}
}
finally {
rm.stop();
}
}
@Test
public void testAppAssignWithoutAvailableResource() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm = new MockNM("127.0.0.1:1234", 1 * 1024,
rm.getResourceTrackerService());
nm.registerNode();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "127.0.0.1",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "/default-rack",
Resources.createResource(1024), 10), ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
10)), null);
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testAppNoNM() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
try {
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
//Get JSON
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
//Get JSON
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 0);
}
finally {
rm.stop();
}
}
@Test
public void testAppReserveNewContainer() throws Exception {
//Start RM so that it accepts app submissions
rm.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
rm.getResourceTrackerService());
MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
rm.getResourceTrackerService());
nm1.registerNode();
nm2.registerNode();
try {
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
RMApp app2 = rm.submitApp(10, "app2", "user1", null, "b2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
am1.allocate(Arrays.asList(ResourceRequest
.newInstance(Priority.UNDEFINED, "*", Resources.createResource(4096),
10)), null);
// Reserve new container
WebResource r = resource();
MultivaluedMapImpl params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
ClientResponse response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
JSONObject json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 1);
// Do a node heartbeat again without releasing container from app2
r = resource();
params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 2);
// Finish application 2
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ContainerId containerId = ContainerId.newContainerId(
am2.getApplicationAttemptId(), 1);
cs.completedContainer(cs.getRMContainer(containerId), ContainerStatus
.newInstance(containerId, ContainerState.COMPLETE, "", 0),
RMContainerEventType.FINISHED);
// Do a node heartbeat again
r = resource();
params = new MultivaluedMapImpl();
params.add("appId", app1.getApplicationId().toString());
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
response = r.path("ws").path("v1").path("cluster").path(
"scheduler/app-activities").queryParams(params).accept(
MediaType.APPLICATION_JSON).get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE, response.getType());
json = response.getEntity(JSONObject.class);
verifyNumberOfAllocations(json, 3);
}
finally {
rm.stop();
}
}
}