YARN-3293. Track and display capacity scheduler health metrics in web
UI. Contributed by Varun Vasudev
(cherry picked from commit afa5d4715a
)
This commit is contained in:
parent
4aba069b37
commit
f5b49160d9
|
@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3294. Allow dumping of Capacity Scheduler debug logs via
|
YARN-3294. Allow dumping of Capacity Scheduler debug logs via
|
||||||
web UI for a fixed time period. (Varun Vasudev via xgong)
|
web UI for a fixed time period. (Varun Vasudev via xgong)
|
||||||
|
|
||||||
|
YARN-3293. Track and display capacity scheduler health metrics
|
||||||
|
in web UI. (Varun Vasudev via xgong)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -552,4 +552,12 @@ public class QueueMetrics implements MetricsSource {
|
||||||
public MetricsSystem getMetricsSystem() {
|
public MetricsSystem getMetricsSystem() {
|
||||||
return metricsSystem;
|
return metricsSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getAggregateAllocatedContainers() {
|
||||||
|
return aggregateContainersAllocated.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getAggegatedReleasedContainers() {
|
||||||
|
return aggregateContainersReleased.value();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,236 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SchedulerHealth {
|
||||||
|
|
||||||
|
static public class DetailedInformation {
|
||||||
|
long timestamp;
|
||||||
|
NodeId nodeId;
|
||||||
|
ContainerId containerId;
|
||||||
|
String queue;
|
||||||
|
|
||||||
|
public DetailedInformation(long timestamp, NodeId nodeId,
|
||||||
|
ContainerId containerId, String queue) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
this.nodeId = nodeId;
|
||||||
|
this.containerId = containerId;
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeId getNodeId() {
|
||||||
|
return nodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ContainerId getContainerId() {
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum Operation {
|
||||||
|
ALLOCATION, RELEASE, PREEMPTION, RESERVATION, FULFILLED_RESERVATION
|
||||||
|
}
|
||||||
|
|
||||||
|
long lastSchedulerRunTime;
|
||||||
|
Map<Operation, Resource> lastSchedulerRunDetails;
|
||||||
|
Map<Operation, DetailedInformation> schedulerHealthDetails;
|
||||||
|
Map<Operation, Long> schedulerOperationCounts;
|
||||||
|
// this is for counts since the RM started, never reset
|
||||||
|
Map<Operation, Long> schedulerOperationAggregateCounts;
|
||||||
|
|
||||||
|
public SchedulerHealth() {
|
||||||
|
lastSchedulerRunDetails = new ConcurrentHashMap<>();
|
||||||
|
schedulerHealthDetails = new ConcurrentHashMap<>();
|
||||||
|
schedulerOperationCounts = new ConcurrentHashMap<>();
|
||||||
|
schedulerOperationAggregateCounts = new ConcurrentHashMap<>();
|
||||||
|
for (Operation op : Operation.values()) {
|
||||||
|
lastSchedulerRunDetails.put(op, Resource.newInstance(0, 0));
|
||||||
|
schedulerOperationCounts.put(op, 0L);
|
||||||
|
schedulerHealthDetails.put(op, new DetailedInformation(0, null, null,
|
||||||
|
null));
|
||||||
|
schedulerOperationAggregateCounts.put(op, 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateAllocation(long timestamp, NodeId nodeId,
|
||||||
|
ContainerId containerId, String queue) {
|
||||||
|
DetailedInformation di =
|
||||||
|
new DetailedInformation(timestamp, nodeId, containerId, queue);
|
||||||
|
schedulerHealthDetails.put(Operation.ALLOCATION, di);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateRelease(long timestamp, NodeId nodeId,
|
||||||
|
ContainerId containerId, String queue) {
|
||||||
|
DetailedInformation di =
|
||||||
|
new DetailedInformation(timestamp, nodeId, containerId, queue);
|
||||||
|
schedulerHealthDetails.put(Operation.RELEASE, di);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updatePreemption(long timestamp, NodeId nodeId,
|
||||||
|
ContainerId containerId, String queue) {
|
||||||
|
DetailedInformation di =
|
||||||
|
new DetailedInformation(timestamp, nodeId, containerId, queue);
|
||||||
|
schedulerHealthDetails.put(Operation.PREEMPTION, di);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateReservation(long timestamp, NodeId nodeId,
|
||||||
|
ContainerId containerId, String queue) {
|
||||||
|
DetailedInformation di =
|
||||||
|
new DetailedInformation(timestamp, nodeId, containerId, queue);
|
||||||
|
schedulerHealthDetails.put(Operation.RESERVATION, di);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerRunDetails(long timestamp, Resource allocated,
|
||||||
|
Resource reserved) {
|
||||||
|
lastSchedulerRunTime = timestamp;
|
||||||
|
lastSchedulerRunDetails.put(Operation.ALLOCATION, allocated);
|
||||||
|
lastSchedulerRunDetails.put(Operation.RESERVATION, reserved);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerReleaseDetails(long timestamp, Resource released) {
|
||||||
|
lastSchedulerRunTime = timestamp;
|
||||||
|
lastSchedulerRunDetails.put(Operation.RELEASE, released);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerReleaseCounts(long count) {
|
||||||
|
updateCounts(Operation.RELEASE, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerAllocationCounts(long count) {
|
||||||
|
updateCounts(Operation.ALLOCATION, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerReservationCounts(long count) {
|
||||||
|
updateCounts(Operation.RESERVATION, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerFulfilledReservationCounts(long count) {
|
||||||
|
updateCounts(Operation.FULFILLED_RESERVATION, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateSchedulerPreemptionCounts(long count) {
|
||||||
|
updateCounts(Operation.PREEMPTION, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateCounts(Operation op, long count) {
|
||||||
|
schedulerOperationCounts.put(op, count);
|
||||||
|
Long tmp = schedulerOperationAggregateCounts.get(op);
|
||||||
|
schedulerOperationAggregateCounts.put(op, tmp + count);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastSchedulerRunTime() {
|
||||||
|
return lastSchedulerRunTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Resource getResourceDetails(Operation op) {
|
||||||
|
return lastSchedulerRunDetails.get(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getResourcesAllocated() {
|
||||||
|
return getResourceDetails(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getResourcesReserved() {
|
||||||
|
return getResourceDetails(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getResourcesReleased() {
|
||||||
|
return getResourceDetails(Operation.RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private DetailedInformation getDetailedInformation(Operation op) {
|
||||||
|
return schedulerHealthDetails.get(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DetailedInformation getLastAllocationDetails() {
|
||||||
|
return getDetailedInformation(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DetailedInformation getLastReleaseDetails() {
|
||||||
|
return getDetailedInformation(Operation.RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DetailedInformation getLastReservationDetails() {
|
||||||
|
return getDetailedInformation(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public DetailedInformation getLastPreemptionDetails() {
|
||||||
|
return getDetailedInformation(Operation.PREEMPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long getOperationCount(Operation op) {
|
||||||
|
return schedulerOperationCounts.get(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAllocationCount() {
|
||||||
|
return getOperationCount(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getReleaseCount() {
|
||||||
|
return getOperationCount(Operation.RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getReservationCount() {
|
||||||
|
return getOperationCount(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getPreemptionCount() {
|
||||||
|
return getOperationCount(Operation.PREEMPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long getAggregateOperationCount(Operation op) {
|
||||||
|
return schedulerOperationAggregateCounts.get(op);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAggregateAllocationCount() {
|
||||||
|
return getAggregateOperationCount(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAggregateReleaseCount() {
|
||||||
|
return getAggregateOperationCount(Operation.RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAggregateReservationCount() {
|
||||||
|
return getAggregateOperationCount(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAggregatePreemptionCount() {
|
||||||
|
return getAggregateOperationCount(Operation.PREEMPTION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getAggregateFulFilledReservationsCount() {
|
||||||
|
return getAggregateOperationCount(Operation.FULFILLED_RESERVATION);
|
||||||
|
}
|
||||||
|
}
|
|
@ -22,40 +22,46 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public class CSAssignment {
|
public class CSAssignment {
|
||||||
|
|
||||||
final private Resource resource;
|
final private Resource resource;
|
||||||
private NodeType type;
|
private NodeType type;
|
||||||
private final RMContainer excessReservation;
|
private final RMContainer excessReservation;
|
||||||
private final FiCaSchedulerApp application;
|
private final FiCaSchedulerApp application;
|
||||||
private final boolean skipped;
|
private final boolean skipped;
|
||||||
|
private boolean fulfilledReservation;
|
||||||
|
private final AssignmentInformation assignmentInformation;
|
||||||
|
|
||||||
public CSAssignment(Resource resource, NodeType type) {
|
public CSAssignment(Resource resource, NodeType type) {
|
||||||
this.resource = resource;
|
this(resource, type, null, null, false, false);
|
||||||
this.type = type;
|
|
||||||
this.application = null;
|
|
||||||
this.excessReservation = null;
|
|
||||||
this.skipped = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
|
public CSAssignment(FiCaSchedulerApp application,
|
||||||
this.resource = excessReservation.getContainer().getResource();
|
RMContainer excessReservation) {
|
||||||
this.type = NodeType.NODE_LOCAL;
|
this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL,
|
||||||
this.application = application;
|
excessReservation, application, false, false);
|
||||||
this.excessReservation = excessReservation;
|
|
||||||
this.skipped = false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public CSAssignment(boolean skipped) {
|
public CSAssignment(boolean skipped) {
|
||||||
this.resource = Resources.createResource(0, 0);
|
this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped,
|
||||||
this.type = NodeType.NODE_LOCAL;
|
false);
|
||||||
this.application = null;
|
}
|
||||||
this.excessReservation = null;
|
|
||||||
|
public CSAssignment(Resource resource, NodeType type,
|
||||||
|
RMContainer excessReservation, FiCaSchedulerApp application,
|
||||||
|
boolean skipped, boolean fulfilledReservation) {
|
||||||
|
this.resource = resource;
|
||||||
|
this.type = type;
|
||||||
|
this.excessReservation = excessReservation;
|
||||||
|
this.application = application;
|
||||||
this.skipped = skipped;
|
this.skipped = skipped;
|
||||||
|
this.fulfilledReservation = fulfilledReservation;
|
||||||
|
this.assignmentInformation = new AssignmentInformation();
|
||||||
}
|
}
|
||||||
|
|
||||||
public Resource getResource() {
|
public Resource getResource() {
|
||||||
|
@ -84,6 +90,35 @@ public class CSAssignment {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return resource.getMemory() + ":" + type;
|
String ret = "resource:" + resource.toString();
|
||||||
|
ret += "; type:" + type;
|
||||||
|
ret += "; excessReservation:" + excessReservation;
|
||||||
|
ret +=
|
||||||
|
"; applicationid:"
|
||||||
|
+ (application != null ? application.getApplicationId().toString()
|
||||||
|
: "null");
|
||||||
|
ret += "; skipped:" + skipped;
|
||||||
|
ret += "; fulfilled reservation:" + fulfilledReservation;
|
||||||
|
ret +=
|
||||||
|
"; allocations(count/resource):"
|
||||||
|
+ assignmentInformation.getNumAllocations() + "/"
|
||||||
|
+ assignmentInformation.getAllocated().toString();
|
||||||
|
ret +=
|
||||||
|
"; reservations(count/resource):"
|
||||||
|
+ assignmentInformation.getNumReservations() + "/"
|
||||||
|
+ assignmentInformation.getReserved().toString();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setFulfilledReservation(boolean fulfilledReservation) {
|
||||||
|
this.fulfilledReservation = fulfilledReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFulfilledReservation() {
|
||||||
|
return this.fulfilledReservation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AssignmentInformation getAssignmentInformation() {
|
||||||
|
return this.assignmentInformation;
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.Groups;
|
import org.apache.hadoop.security.Groups;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -84,6 +85,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
@ -91,9 +93,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundExce
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QueueMapping.MappingType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
|
@ -213,7 +217,8 @@ public class CapacityScheduler extends
|
||||||
private boolean scheduleAsynchronously;
|
private boolean scheduleAsynchronously;
|
||||||
private AsyncScheduleThread asyncSchedulerThread;
|
private AsyncScheduleThread asyncSchedulerThread;
|
||||||
private RMNodeLabelsManager labelManager;
|
private RMNodeLabelsManager labelManager;
|
||||||
|
private SchedulerHealth schedulerHealth = new SchedulerHealth();
|
||||||
|
long lastNodeUpdateTime;
|
||||||
/**
|
/**
|
||||||
* EXPERT
|
* EXPERT
|
||||||
*/
|
*/
|
||||||
|
@ -955,6 +960,8 @@ public class CapacityScheduler extends
|
||||||
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
|
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Resource releaseResources = Resource.newInstance(0, 0);
|
||||||
|
|
||||||
FiCaSchedulerNode node = getNode(nm.getNodeID());
|
FiCaSchedulerNode node = getNode(nm.getNodeID());
|
||||||
|
|
||||||
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
|
||||||
|
@ -971,12 +978,29 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process completed containers
|
// Process completed containers
|
||||||
|
int releasedContainers = 0;
|
||||||
for (ContainerStatus completedContainer : completedContainers) {
|
for (ContainerStatus completedContainer : completedContainers) {
|
||||||
ContainerId containerId = completedContainer.getContainerId();
|
ContainerId containerId = completedContainer.getContainerId();
|
||||||
|
RMContainer container = getRMContainer(containerId);
|
||||||
LOG.debug("Container FINISHED: " + containerId);
|
LOG.debug("Container FINISHED: " + containerId);
|
||||||
completedContainer(getRMContainer(containerId),
|
completedContainer(container, completedContainer,
|
||||||
completedContainer, RMContainerEventType.FINISHED);
|
RMContainerEventType.FINISHED);
|
||||||
|
if (container != null) {
|
||||||
|
releasedContainers++;
|
||||||
|
Resource rs = container.getAllocatedResource();
|
||||||
|
if (rs != null) {
|
||||||
|
Resources.addTo(releaseResources, rs);
|
||||||
}
|
}
|
||||||
|
rs = container.getReservedResource();
|
||||||
|
if (rs != null) {
|
||||||
|
Resources.addTo(releaseResources, rs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
|
||||||
|
releaseResources);
|
||||||
|
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
|
||||||
|
|
||||||
// Now node data structures are upto date and ready for scheduling.
|
// Now node data structures are upto date and ready for scheduling.
|
||||||
if(LOG.isDebugEnabled()) {
|
if(LOG.isDebugEnabled()) {
|
||||||
|
@ -1040,11 +1064,47 @@ public class CapacityScheduler extends
|
||||||
node.updateLabels(newLabels);
|
node.updateLabels(newLabels);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
|
||||||
|
CSAssignment assignment) {
|
||||||
|
|
||||||
|
NodeId nodeId = node.getNodeID();
|
||||||
|
List<AssignmentInformation.AssignmentDetails> allocations =
|
||||||
|
assignment.getAssignmentInformation().getAllocationDetails();
|
||||||
|
List<AssignmentInformation.AssignmentDetails> reservations =
|
||||||
|
assignment.getAssignmentInformation().getReservationDetails();
|
||||||
|
if (!allocations.isEmpty()) {
|
||||||
|
ContainerId allocatedContainerId =
|
||||||
|
allocations.get(allocations.size() - 1).containerId;
|
||||||
|
String allocatedQueue = allocations.get(allocations.size() - 1).queue;
|
||||||
|
schedulerHealth.updateAllocation(now, nodeId, allocatedContainerId,
|
||||||
|
allocatedQueue);
|
||||||
|
}
|
||||||
|
if (!reservations.isEmpty()) {
|
||||||
|
ContainerId reservedContainerId =
|
||||||
|
reservations.get(reservations.size() - 1).containerId;
|
||||||
|
String reservedQueue = reservations.get(reservations.size() - 1).queue;
|
||||||
|
schedulerHealth.updateReservation(now, nodeId, reservedContainerId,
|
||||||
|
reservedQueue);
|
||||||
|
}
|
||||||
|
schedulerHealth.updateSchedulerReservationCounts(assignment
|
||||||
|
.getAssignmentInformation().getNumReservations());
|
||||||
|
schedulerHealth.updateSchedulerAllocationCounts(assignment
|
||||||
|
.getAssignmentInformation().getNumAllocations());
|
||||||
|
schedulerHealth.updateSchedulerRunDetails(now, assignment
|
||||||
|
.getAssignmentInformation().getAllocated(), assignment
|
||||||
|
.getAssignmentInformation().getReserved());
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// reset allocation and reservation stats before we start doing any work
|
||||||
|
updateSchedulerHealth(lastNodeUpdateTime, node,
|
||||||
|
new CSAssignment(Resources.none(), NodeType.NODE_LOCAL));
|
||||||
|
|
||||||
|
CSAssignment assignment;
|
||||||
|
|
||||||
// Assign new containers...
|
// Assign new containers...
|
||||||
// 1. Check for reserved applications
|
// 1. Check for reserved applications
|
||||||
|
@ -1061,14 +1121,25 @@ public class CapacityScheduler extends
|
||||||
node.getNodeID());
|
node.getNodeID());
|
||||||
|
|
||||||
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
|
||||||
CSAssignment assignment =
|
assignment = queue.assignContainers(
|
||||||
queue.assignContainers(
|
|
||||||
clusterResource,
|
clusterResource,
|
||||||
node,
|
node,
|
||||||
// TODO, now we only consider limits for parent for non-labeled
|
// TODO, now we only consider limits for parent for non-labeled
|
||||||
// resources, should consider labeled resources as well.
|
// resources, should consider labeled resources as well.
|
||||||
new ResourceLimits(labelManager.getResourceByLabel(
|
new ResourceLimits(labelManager.getResourceByLabel(
|
||||||
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
||||||
|
if (assignment.isFulfilledReservation()) {
|
||||||
|
CSAssignment tmp =
|
||||||
|
new CSAssignment(reservedContainer.getReservedResource(),
|
||||||
|
assignment.getType());
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
|
||||||
|
reservedContainer.getReservedResource());
|
||||||
|
tmp.getAssignmentInformation().addAllocationDetails(
|
||||||
|
reservedContainer.getContainerId(), queue.getQueuePath());
|
||||||
|
tmp.getAssignmentInformation().incrAllocations();
|
||||||
|
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
|
||||||
|
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
|
||||||
|
}
|
||||||
|
|
||||||
RMContainer excessReservation = assignment.getExcessReservation();
|
RMContainer excessReservation = assignment.getExcessReservation();
|
||||||
if (excessReservation != null) {
|
if (excessReservation != null) {
|
||||||
|
@ -1092,13 +1163,14 @@ public class CapacityScheduler extends
|
||||||
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
|
||||||
", available: " + node.getAvailableResource());
|
", available: " + node.getAvailableResource());
|
||||||
}
|
}
|
||||||
root.assignContainers(
|
assignment = root.assignContainers(
|
||||||
clusterResource,
|
clusterResource,
|
||||||
node,
|
node,
|
||||||
// TODO, now we only consider limits for parent for non-labeled
|
// TODO, now we only consider limits for parent for non-labeled
|
||||||
// resources, should consider labeled resources as well.
|
// resources, should consider labeled resources as well.
|
||||||
new ResourceLimits(labelManager.getResourceByLabel(
|
new ResourceLimits(labelManager.getResourceByLabel(
|
||||||
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
RMNodeLabelsManager.NO_LABEL, clusterResource)));
|
||||||
|
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Skipping scheduling since node " + node.getNodeID() +
|
LOG.info("Skipping scheduling since node " + node.getNodeID() +
|
||||||
|
@ -1151,6 +1223,7 @@ public class CapacityScheduler extends
|
||||||
{
|
{
|
||||||
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
|
||||||
RMNode node = nodeUpdatedEvent.getRMNode();
|
RMNode node = nodeUpdatedEvent.getRMNode();
|
||||||
|
setLastNodeUpdateTime(Time.now());
|
||||||
nodeUpdate(node);
|
nodeUpdate(node);
|
||||||
if (!scheduleAsynchronously) {
|
if (!scheduleAsynchronously) {
|
||||||
allocateContainersToNode(getNode(node.getNodeID()));
|
allocateContainersToNode(getNode(node.getNodeID()));
|
||||||
|
@ -1319,6 +1392,14 @@ public class CapacityScheduler extends
|
||||||
LOG.info("Application attempt " + application.getApplicationAttemptId()
|
LOG.info("Application attempt " + application.getApplicationAttemptId()
|
||||||
+ " released container " + container.getId() + " on node: " + node
|
+ " released container " + container.getId() + " on node: " + node
|
||||||
+ " with event: " + event);
|
+ " with event: " + event);
|
||||||
|
if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
|
||||||
|
schedulerHealth.updatePreemption(Time.now(), container.getNodeId(),
|
||||||
|
container.getId(), queue.getQueuePath());
|
||||||
|
schedulerHealth.updateSchedulerPreemptionCounts(1);
|
||||||
|
} else {
|
||||||
|
schedulerHealth.updateRelease(lastNodeUpdateTime, container.getNodeId(),
|
||||||
|
container.getId(), queue.getQueuePath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Lock(Lock.NoLock.class)
|
@Lock(Lock.NoLock.class)
|
||||||
|
@ -1648,4 +1729,12 @@ public class CapacityScheduler extends
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public SchedulerHealth getSchedulerHealth() {
|
||||||
|
return this.schedulerHealth;
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void setLastNodeUpdateTime(long time) {
|
||||||
|
this.lastNodeUpdateTime = time;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -913,12 +913,17 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to assign if we have sufficient resources
|
// Try to assign if we have sufficient resources
|
||||||
|
CSAssignment tmp =
|
||||||
assignContainersOnNode(clusterResource, node, application, priority,
|
assignContainersOnNode(clusterResource, node, application, priority,
|
||||||
rmContainer);
|
rmContainer);
|
||||||
|
|
||||||
// Doesn't matter... since it's already charged for at time of reservation
|
// Doesn't matter... since it's already charged for at time of reservation
|
||||||
// "re-reservation" is *free*
|
// "re-reservation" is *free*
|
||||||
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
|
CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
|
||||||
|
if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
|
||||||
|
ret.setFulfilledReservation(true);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
|
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
|
||||||
|
@ -1172,7 +1177,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private CSAssignment assignContainersOnNode(Resource clusterResource,
|
private CSAssignment assignContainersOnNode(Resource clusterResource,
|
||||||
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
||||||
RMContainer reservedContainer) {
|
RMContainer reservedContainer) {
|
||||||
Resource assigned = Resources.none();
|
|
||||||
|
CSAssignment assigned;
|
||||||
|
|
||||||
NodeType requestType = null;
|
NodeType requestType = null;
|
||||||
MutableObject allocatedContainer = new MutableObject();
|
MutableObject allocatedContainer = new MutableObject();
|
||||||
|
@ -1186,14 +1192,15 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
node, application, priority, reservedContainer,
|
node, application, priority, reservedContainer,
|
||||||
allocatedContainer);
|
allocatedContainer);
|
||||||
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
||||||
assigned, Resources.none())) {
|
assigned.getResource(), Resources.none())) {
|
||||||
|
|
||||||
//update locality statistics
|
//update locality statistics
|
||||||
if (allocatedContainer.getValue() != null) {
|
if (allocatedContainer.getValue() != null) {
|
||||||
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
|
application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
|
||||||
requestType);
|
requestType);
|
||||||
}
|
}
|
||||||
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
|
assigned.setType(NodeType.NODE_LOCAL);
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1214,14 +1221,15 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
node, application, priority, reservedContainer,
|
node, application, priority, reservedContainer,
|
||||||
allocatedContainer);
|
allocatedContainer);
|
||||||
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
if (Resources.greaterThan(resourceCalculator, clusterResource,
|
||||||
assigned, Resources.none())) {
|
assigned.getResource(), Resources.none())) {
|
||||||
|
|
||||||
//update locality statistics
|
//update locality statistics
|
||||||
if (allocatedContainer.getValue() != null) {
|
if (allocatedContainer.getValue() != null) {
|
||||||
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
|
application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
|
||||||
requestType);
|
requestType);
|
||||||
}
|
}
|
||||||
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
|
assigned.setType(NodeType.RACK_LOCAL);
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1246,7 +1254,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
if (allocatedContainer.getValue() != null) {
|
if (allocatedContainer.getValue() != null) {
|
||||||
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
|
application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
|
||||||
}
|
}
|
||||||
return new CSAssignment(assigned, NodeType.OFF_SWITCH);
|
assigned.setType(NodeType.OFF_SWITCH);
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
return SKIP_ASSIGNMENT;
|
return SKIP_ASSIGNMENT;
|
||||||
|
@ -1255,10 +1264,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
|
private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
|
||||||
// First we need to get minimum resource we need unreserve
|
// First we need to get minimum resource we need unreserve
|
||||||
// minimum-resource-need-unreserve = used + asked - limit
|
// minimum-resource-need-unreserve = used + asked - limit
|
||||||
Resource minimumUnreservedResource =
|
return Resources.subtract(
|
||||||
Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
|
Resources.add(queueUsage.getUsed(), askedResource),
|
||||||
currentResourceLimits.getLimit());
|
currentResourceLimits.getLimit());
|
||||||
return minimumUnreservedResource;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -1334,7 +1342,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Resource assignNodeLocalContainers(Resource clusterResource,
|
private CSAssignment assignNodeLocalContainers(Resource clusterResource,
|
||||||
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
|
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
|
||||||
FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerApp application, Priority priority,
|
||||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||||
|
@ -1345,10 +1353,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
allocatedContainer);
|
allocatedContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Resource assignRackLocalContainers(Resource clusterResource,
|
private CSAssignment assignRackLocalContainers(Resource clusterResource,
|
||||||
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
|
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
|
||||||
FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerApp application, Priority priority,
|
||||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||||
|
@ -1359,10 +1367,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
allocatedContainer);
|
allocatedContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
private Resource assignOffSwitchContainers(Resource clusterResource,
|
private CSAssignment assignOffSwitchContainers(Resource clusterResource,
|
||||||
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
|
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
|
||||||
FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerApp application, Priority priority,
|
||||||
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
RMContainer reservedContainer, MutableObject allocatedContainer) {
|
||||||
|
@ -1373,7 +1381,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
allocatedContainer);
|
allocatedContainer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean canAssign(FiCaSchedulerApp application, Priority priority,
|
boolean canAssign(FiCaSchedulerApp application, Priority priority,
|
||||||
|
@ -1443,15 +1451,13 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
.getApplicationAttemptId(), application.getNewContainerId());
|
.getApplicationAttemptId(), application.getNewContainerId());
|
||||||
|
|
||||||
// Create the container
|
// Create the container
|
||||||
Container container =
|
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
||||||
BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
|
||||||
.getHttpAddress(), capability, priority, null);
|
.getHttpAddress(), capability, priority, null);
|
||||||
|
|
||||||
return container;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
||||||
FiCaSchedulerApp application, Priority priority,
|
FiCaSchedulerApp application, Priority priority,
|
||||||
ResourceRequest request, NodeType type, RMContainer rmContainer,
|
ResourceRequest request, NodeType type, RMContainer rmContainer,
|
||||||
MutableObject createdContainer) {
|
MutableObject createdContainer) {
|
||||||
|
@ -1472,7 +1478,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
if (rmContainer != null) {
|
if (rmContainer != null) {
|
||||||
unreserve(application, priority, node, rmContainer);
|
unreserve(application, priority, node, rmContainer);
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
|
|
||||||
Resource capability = request.getCapability();
|
Resource capability = request.getCapability();
|
||||||
|
@ -1484,7 +1490,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
LOG.warn("Node : " + node.getNodeID()
|
LOG.warn("Node : " + node.getNodeID()
|
||||||
+ " does not have sufficient resource for request : " + request
|
+ " does not have sufficient resource for request : " + request
|
||||||
+ " node total capability : " + node.getTotalResource());
|
+ " node total capability : " + node.getTotalResource());
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
|
|
||||||
assert Resources.greaterThan(
|
assert Resources.greaterThan(
|
||||||
|
@ -1497,7 +1503,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// something went wrong getting/creating the container
|
// something went wrong getting/creating the container
|
||||||
if (container == null) {
|
if (container == null) {
|
||||||
LOG.warn("Couldn't get container for allocation!");
|
LOG.warn("Couldn't get container for allocation!");
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
|
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
|
||||||
|
@ -1529,7 +1535,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// container (That means we *have to* unreserve some resource to
|
// container (That means we *have to* unreserve some resource to
|
||||||
// continue)). If we failed to unreserve some resource,
|
// continue)). If we failed to unreserve some resource,
|
||||||
if (!containerUnreserved) {
|
if (!containerUnreserved) {
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1540,7 +1546,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
// Does the application need this resource?
|
// Does the application need this resource?
|
||||||
if (allocatedContainer == null) {
|
if (allocatedContainer == null) {
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Inform the node
|
// Inform the node
|
||||||
|
@ -1552,7 +1558,13 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
" queue=" + this +
|
" queue=" + this +
|
||||||
" clusterResource=" + clusterResource);
|
" clusterResource=" + clusterResource);
|
||||||
createdContainer.setValue(allocatedContainer);
|
createdContainer.setValue(allocatedContainer);
|
||||||
return container.getResource();
|
CSAssignment assignment = new CSAssignment(container.getResource(), type);
|
||||||
|
assignment.getAssignmentInformation().addAllocationDetails(
|
||||||
|
container.getId(), getQueuePath());
|
||||||
|
assignment.getAssignmentInformation().incrAllocations();
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
|
||||||
|
container.getResource());
|
||||||
|
return assignment;
|
||||||
} else {
|
} else {
|
||||||
// if we are allowed to allocate but this node doesn't have space, reserve it or
|
// if we are allowed to allocate but this node doesn't have space, reserve it or
|
||||||
// if this was an already a reserved container, reserve it again
|
// if this was an already a reserved container, reserve it again
|
||||||
|
@ -1566,7 +1578,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
// reserve the new container
|
// reserve the new container
|
||||||
if (!checkLimitsToReserve(clusterResource,
|
if (!checkLimitsToReserve(clusterResource,
|
||||||
application, capability)) {
|
application, capability)) {
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1581,10 +1593,16 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
|
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
|
||||||
" used=" + queueUsage.getUsed() +
|
" used=" + queueUsage.getUsed() +
|
||||||
" cluster=" + clusterResource);
|
" cluster=" + clusterResource);
|
||||||
|
CSAssignment assignment =
|
||||||
return request.getCapability();
|
new CSAssignment(request.getCapability(), type);
|
||||||
|
assignment.getAssignmentInformation().addReservationDetails(
|
||||||
|
container.getId(), getQueuePath());
|
||||||
|
assignment.getAssignmentInformation().incrReservations();
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
|
||||||
|
request.getCapability());
|
||||||
|
return assignment;
|
||||||
}
|
}
|
||||||
return Resources.none();
|
return new CSAssignment(Resources.none(), type);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
@ -415,7 +416,27 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
nodeLabels);
|
nodeLabels);
|
||||||
|
|
||||||
// Track resource utilization in this pass of the scheduler
|
// Track resource utilization in this pass of the scheduler
|
||||||
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
|
Resources
|
||||||
|
.addTo(assignment.getResource(), assignedToChild.getResource());
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
|
||||||
|
assignedToChild.getAssignmentInformation().getAllocated());
|
||||||
|
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
|
||||||
|
assignedToChild.getAssignmentInformation().getReserved());
|
||||||
|
assignment.getAssignmentInformation().incrAllocations(
|
||||||
|
assignedToChild.getAssignmentInformation().getNumAllocations());
|
||||||
|
assignment.getAssignmentInformation().incrReservations(
|
||||||
|
assignedToChild.getAssignmentInformation().getNumReservations());
|
||||||
|
assignment
|
||||||
|
.getAssignmentInformation()
|
||||||
|
.getAllocationDetails()
|
||||||
|
.addAll(
|
||||||
|
assignedToChild.getAssignmentInformation().getAllocationDetails());
|
||||||
|
assignment
|
||||||
|
.getAssignmentInformation()
|
||||||
|
.getReservationDetails()
|
||||||
|
.addAll(
|
||||||
|
assignedToChild.getAssignmentInformation()
|
||||||
|
.getReservationDetails());
|
||||||
|
|
||||||
LOG.info("assignedContainer" +
|
LOG.info("assignedContainer" +
|
||||||
" queue=" + getQueueName() +
|
" queue=" + getQueueName() +
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class AssignmentInformation {
|
||||||
|
|
||||||
|
public enum Operation {
|
||||||
|
ALLOCATION, RESERVATION
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class AssignmentDetails {
|
||||||
|
public ContainerId containerId;
|
||||||
|
public String queue;
|
||||||
|
|
||||||
|
public AssignmentDetails(ContainerId containerId, String queue) {
|
||||||
|
this.containerId = containerId;
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Map<Operation, Integer> operationCounts;
|
||||||
|
private final Map<Operation, Resource> operationResources;
|
||||||
|
private final Map<Operation, List<AssignmentDetails>> operationDetails;
|
||||||
|
|
||||||
|
public AssignmentInformation() {
|
||||||
|
this.operationCounts = new HashMap<>();
|
||||||
|
this.operationResources = new HashMap<>();
|
||||||
|
this.operationDetails = new HashMap<>();
|
||||||
|
for (Operation op : Operation.values()) {
|
||||||
|
operationCounts.put(op, 0);
|
||||||
|
operationResources.put(op, Resource.newInstance(0, 0));
|
||||||
|
operationDetails.put(op, new ArrayList<AssignmentDetails>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumAllocations() {
|
||||||
|
return operationCounts.get(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrAllocations() {
|
||||||
|
increment(Operation.ALLOCATION, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrAllocations(int by) {
|
||||||
|
increment(Operation.ALLOCATION, by);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNumReservations() {
|
||||||
|
return operationCounts.get(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrReservations() {
|
||||||
|
increment(Operation.RESERVATION, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrReservations(int by) {
|
||||||
|
increment(Operation.RESERVATION, by);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void increment(Operation op, int by) {
|
||||||
|
operationCounts.put(op, operationCounts.get(op) + by);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getAllocated() {
|
||||||
|
return operationResources.get(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getReserved() {
|
||||||
|
return operationResources.get(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void addAssignmentDetails(Operation op, ContainerId containerId,
|
||||||
|
String queue) {
|
||||||
|
operationDetails.get(op).add(new AssignmentDetails(containerId, queue));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addAllocationDetails(ContainerId containerId, String queue) {
|
||||||
|
addAssignmentDetails(Operation.ALLOCATION, containerId, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addReservationDetails(ContainerId containerId, String queue) {
|
||||||
|
addAssignmentDetails(Operation.RESERVATION, containerId, queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AssignmentDetails> getAllocationDetails() {
|
||||||
|
return operationDetails.get(Operation.ALLOCATION);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<AssignmentDetails> getReservationDetails() {
|
||||||
|
return operationDetails.get(Operation.RESERVATION);
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,17 +21,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||||
import static org.apache.hadoop.yarn.util.StringHelper.join;
|
import static org.apache.hadoop.yarn.util.StringHelper.join;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
import org.apache.hadoop.yarn.server.webapp.AppsBlock;
|
|
||||||
import org.apache.hadoop.yarn.webapp.ResponseInfo;
|
import org.apache.hadoop.yarn.webapp.ResponseInfo;
|
||||||
import org.apache.hadoop.yarn.webapp.SubView;
|
import org.apache.hadoop.yarn.webapp.SubView;
|
||||||
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
|
||||||
|
@ -244,7 +246,7 @@ class CapacitySchedulerPage extends RmView {
|
||||||
span(".q", "default")._()._();
|
span(".q", "default")._()._();
|
||||||
} else {
|
} else {
|
||||||
CSQueue root = cs.getRootQueue();
|
CSQueue root = cs.getRootQueue();
|
||||||
CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root);
|
CapacitySchedulerInfo sinfo = new CapacitySchedulerInfo(root, cs);
|
||||||
csqinfo.csinfo = sinfo;
|
csqinfo.csinfo = sinfo;
|
||||||
csqinfo.qinfo = null;
|
csqinfo.qinfo = null;
|
||||||
|
|
||||||
|
@ -274,6 +276,95 @@ class CapacitySchedulerPage extends RmView {
|
||||||
script().$type("text/javascript").
|
script().$type("text/javascript").
|
||||||
_("$('#cs').hide();")._()._().
|
_("$('#cs').hide();")._()._().
|
||||||
_(RMAppsBlock.class);
|
_(RMAppsBlock.class);
|
||||||
|
html._(HealthBlock.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class HealthBlock extends HtmlBlock {
|
||||||
|
|
||||||
|
final CapacityScheduler cs;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
HealthBlock(ResourceManager rm) {
|
||||||
|
cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void render(HtmlBlock.Block html) {
|
||||||
|
SchedulerHealth healthInfo = cs.getSchedulerHealth();
|
||||||
|
DIV<Hamlet> div = html.div("#health");
|
||||||
|
div.h4("Aggregate scheduler counts");
|
||||||
|
TBODY<TABLE<DIV<Hamlet>>> tbody =
|
||||||
|
div.table("#lastrun").thead().$class("ui-widget-header").tr().th()
|
||||||
|
.$class("ui-state-default")._("Total Container Allocations(count)")
|
||||||
|
._().th().$class("ui-state-default")
|
||||||
|
._("Total Container Releases(count)")._().th()
|
||||||
|
.$class("ui-state-default")
|
||||||
|
._("Total Fulfilled Reservations(count)")._().th()
|
||||||
|
.$class("ui-state-default")._("Total Container Preemptions(count)")
|
||||||
|
._()._()._().tbody();
|
||||||
|
tbody
|
||||||
|
.$class("ui-widget-content")
|
||||||
|
.tr()
|
||||||
|
.td(
|
||||||
|
String.valueOf(cs.getRootQueueMetrics()
|
||||||
|
.getAggregateAllocatedContainers()))
|
||||||
|
.td(
|
||||||
|
String.valueOf(cs.getRootQueueMetrics()
|
||||||
|
.getAggegatedReleasedContainers()))
|
||||||
|
.td(healthInfo.getAggregateFulFilledReservationsCount().toString())
|
||||||
|
.td(healthInfo.getAggregatePreemptionCount().toString())._()._()._();
|
||||||
|
div.h4("Last scheduler run");
|
||||||
|
tbody =
|
||||||
|
div.table("#lastrun").thead().$class("ui-widget-header").tr().th()
|
||||||
|
.$class("ui-state-default")._("Time")._().th()
|
||||||
|
.$class("ui-state-default")._("Allocations(count - resources)")._()
|
||||||
|
.th().$class("ui-state-default")._("Reservations(count - resources)")
|
||||||
|
._().th().$class("ui-state-default")._("Releases(count - resources)")
|
||||||
|
._()._()._().tbody();
|
||||||
|
tbody
|
||||||
|
.$class("ui-widget-content")
|
||||||
|
.tr()
|
||||||
|
.td(Times.format(healthInfo.getLastSchedulerRunTime()))
|
||||||
|
.td(
|
||||||
|
healthInfo.getAllocationCount().toString() + " - "
|
||||||
|
+ healthInfo.getResourcesAllocated().toString())
|
||||||
|
.td(
|
||||||
|
healthInfo.getReservationCount().toString() + " - "
|
||||||
|
+ healthInfo.getResourcesReserved().toString())
|
||||||
|
.td(
|
||||||
|
healthInfo.getReleaseCount().toString() + " - "
|
||||||
|
+ healthInfo.getResourcesReleased().toString())._()._()._();
|
||||||
|
Map<String, SchedulerHealth.DetailedInformation> info = new HashMap<>();
|
||||||
|
info.put("Allocation", healthInfo.getLastAllocationDetails());
|
||||||
|
info.put("Reservation", healthInfo.getLastReservationDetails());
|
||||||
|
info.put("Release", healthInfo.getLastReleaseDetails());
|
||||||
|
info.put("Preemption", healthInfo.getLastPreemptionDetails());
|
||||||
|
|
||||||
|
for (Map.Entry<String, SchedulerHealth.DetailedInformation> entry : info
|
||||||
|
.entrySet()) {
|
||||||
|
String containerId = "N/A";
|
||||||
|
String nodeId = "N/A";
|
||||||
|
String queue = "N/A";
|
||||||
|
String table = "#" + entry.getKey();
|
||||||
|
div.h4("Last " + entry.getKey());
|
||||||
|
tbody =
|
||||||
|
div.table(table).thead().$class("ui-widget-header").tr().th()
|
||||||
|
.$class("ui-state-default")._("Time")._().th()
|
||||||
|
.$class("ui-state-default")._("Container Id")._().th()
|
||||||
|
.$class("ui-state-default")._("Node Id")._().th()
|
||||||
|
.$class("ui-state-default")._("Queue")._()._()._().tbody();
|
||||||
|
SchedulerHealth.DetailedInformation di = entry.getValue();
|
||||||
|
if (di.getTimestamp() != 0) {
|
||||||
|
containerId = di.getContainerId().toString();
|
||||||
|
nodeId = di.getNodeId().toString();
|
||||||
|
queue = di.getQueue();
|
||||||
|
}
|
||||||
|
tbody.$class("ui-widget-content").tr()
|
||||||
|
.td(Times.format(di.getTimestamp())).td(containerId).td(nodeId)
|
||||||
|
.td(queue)._()._()._();
|
||||||
|
}
|
||||||
|
div._();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
|
||||||
NodesInfo.class, RemoteExceptionData.class,
|
NodesInfo.class, RemoteExceptionData.class,
|
||||||
CapacitySchedulerQueueInfoList.class, ResourceInfo.class,
|
CapacitySchedulerQueueInfoList.class, ResourceInfo.class,
|
||||||
UsersInfo.class, UserInfo.class, ApplicationStatisticsInfo.class,
|
UsersInfo.class, UserInfo.class, ApplicationStatisticsInfo.class,
|
||||||
StatisticsItemInfo.class };
|
StatisticsItemInfo.class, CapacitySchedulerHealthInfo.class };
|
||||||
// these dao classes need root unwrapping
|
// these dao classes need root unwrapping
|
||||||
final Class[] rootUnwrappedTypes =
|
final Class[] rootUnwrappedTypes =
|
||||||
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
|
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
|
||||||
|
|
|
@ -228,7 +228,7 @@ public class RMWebServices {
|
||||||
if (rs instanceof CapacityScheduler) {
|
if (rs instanceof CapacityScheduler) {
|
||||||
CapacityScheduler cs = (CapacityScheduler) rs;
|
CapacityScheduler cs = (CapacityScheduler) rs;
|
||||||
CSQueue root = cs.getRootQueue();
|
CSQueue root = cs.getRootQueue();
|
||||||
sinfo = new CapacitySchedulerInfo(root);
|
sinfo = new CapacitySchedulerInfo(root, cs);
|
||||||
} else if (rs instanceof FairScheduler) {
|
} else if (rs instanceof FairScheduler) {
|
||||||
FairScheduler fs = (FairScheduler) rs;
|
FairScheduler fs = (FairScheduler) rs;
|
||||||
sinfo = new FairSchedulerInfo(fs);
|
sinfo = new FairSchedulerInfo(fs);
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
|
||||||
|
import javax.xml.bind.annotation.XmlAccessType;
|
||||||
|
import javax.xml.bind.annotation.XmlAccessorType;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
public class CapacitySchedulerHealthInfo {
|
||||||
|
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
public static class OperationInformation {
|
||||||
|
String nodeId;
|
||||||
|
String containerId;
|
||||||
|
String queue;
|
||||||
|
|
||||||
|
OperationInformation() {
|
||||||
|
}
|
||||||
|
|
||||||
|
OperationInformation(SchedulerHealth.DetailedInformation di) {
|
||||||
|
this.nodeId = di.getNodeId() == null ? "N/A" : di.getNodeId().toString();
|
||||||
|
this.containerId =
|
||||||
|
di.getContainerId() == null ? "N/A" : di.getContainerId().toString();
|
||||||
|
this.queue = di.getQueue() == null ? "N/A" : di.getQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getNodeId() {
|
||||||
|
return nodeId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getContainerId() {
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@XmlAccessorType(XmlAccessType.FIELD)
|
||||||
|
public static class LastRunDetails {
|
||||||
|
String operation;
|
||||||
|
long count;
|
||||||
|
ResourceInfo resources;
|
||||||
|
|
||||||
|
LastRunDetails() {
|
||||||
|
}
|
||||||
|
|
||||||
|
LastRunDetails(String operation, long count, Resource resource) {
|
||||||
|
this.operation = operation;
|
||||||
|
this.count = count;
|
||||||
|
this.resources = new ResourceInfo(resource);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOperation() {
|
||||||
|
return operation;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCount() {
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ResourceInfo getResources() {
|
||||||
|
return resources;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long lastrun;
|
||||||
|
Map<String, OperationInformation> operationsInfo;
|
||||||
|
List<LastRunDetails> lastRunDetails;
|
||||||
|
|
||||||
|
CapacitySchedulerHealthInfo() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastrun() {
|
||||||
|
return lastrun;
|
||||||
|
}
|
||||||
|
|
||||||
|
CapacitySchedulerHealthInfo(CapacityScheduler cs) {
|
||||||
|
SchedulerHealth ht = cs.getSchedulerHealth();
|
||||||
|
lastrun = ht.getLastSchedulerRunTime();
|
||||||
|
operationsInfo = new HashMap<>();
|
||||||
|
operationsInfo.put("last-allocation",
|
||||||
|
new OperationInformation(ht.getLastAllocationDetails()));
|
||||||
|
operationsInfo.put("last-release",
|
||||||
|
new OperationInformation(ht.getLastReleaseDetails()));
|
||||||
|
operationsInfo.put("last-preemption",
|
||||||
|
new OperationInformation(ht.getLastPreemptionDetails()));
|
||||||
|
operationsInfo.put("last-reservation",
|
||||||
|
new OperationInformation(ht.getLastReservationDetails()));
|
||||||
|
|
||||||
|
lastRunDetails = new ArrayList<>();
|
||||||
|
lastRunDetails.add(new LastRunDetails("releases", ht.getReleaseCount(), ht
|
||||||
|
.getResourcesReleased()));
|
||||||
|
lastRunDetails.add(new LastRunDetails("allocations", ht
|
||||||
|
.getAllocationCount(), ht.getResourcesAllocated()));
|
||||||
|
lastRunDetails.add(new LastRunDetails("reservations", ht
|
||||||
|
.getReservationCount(), ht.getResourcesReserved()));
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import javax.xml.bind.annotation.XmlTransient;
|
||||||
import javax.xml.bind.annotation.XmlType;
|
import javax.xml.bind.annotation.XmlType;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||||
|
|
||||||
@XmlRootElement(name = "capacityScheduler")
|
@XmlRootElement(name = "capacityScheduler")
|
||||||
|
@ -37,6 +38,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
||||||
protected float maxCapacity;
|
protected float maxCapacity;
|
||||||
protected String queueName;
|
protected String queueName;
|
||||||
protected CapacitySchedulerQueueInfoList queues;
|
protected CapacitySchedulerQueueInfoList queues;
|
||||||
|
protected CapacitySchedulerHealthInfo health;
|
||||||
|
|
||||||
@XmlTransient
|
@XmlTransient
|
||||||
static final float EPSILON = 1e-8f;
|
static final float EPSILON = 1e-8f;
|
||||||
|
@ -44,7 +46,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
||||||
public CapacitySchedulerInfo() {
|
public CapacitySchedulerInfo() {
|
||||||
} // JAXB needs this
|
} // JAXB needs this
|
||||||
|
|
||||||
public CapacitySchedulerInfo(CSQueue parent) {
|
public CapacitySchedulerInfo(CSQueue parent, CapacityScheduler cs) {
|
||||||
this.queueName = parent.getQueueName();
|
this.queueName = parent.getQueueName();
|
||||||
this.usedCapacity = parent.getUsedCapacity() * 100;
|
this.usedCapacity = parent.getUsedCapacity() * 100;
|
||||||
this.capacity = parent.getCapacity() * 100;
|
this.capacity = parent.getCapacity() * 100;
|
||||||
|
@ -54,6 +56,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
||||||
this.maxCapacity = max * 100;
|
this.maxCapacity = max * 100;
|
||||||
|
|
||||||
queues = getQueues(parent);
|
queues = getQueues(parent);
|
||||||
|
health = new CapacitySchedulerHealthInfo(cs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public float getCapacity() {
|
public float getCapacity() {
|
||||||
|
|
|
@ -0,0 +1,351 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.junit.Assume.assumeTrue;
|
||||||
|
|
||||||
|
public class TestSchedulerHealth {
|
||||||
|
|
||||||
|
private ResourceManager resourceManager;
|
||||||
|
|
||||||
|
public void setup() {
|
||||||
|
resourceManager = new ResourceManager() {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
|
||||||
|
mgr.init(getConfig());
|
||||||
|
return mgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
resourceManager.init(conf);
|
||||||
|
resourceManager.getRMContext().getContainerTokenSecretManager()
|
||||||
|
.rollMasterKey();
|
||||||
|
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
|
||||||
|
((AsyncDispatcher) resourceManager.getRMContext().getDispatcher()).start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCounts() {
|
||||||
|
|
||||||
|
SchedulerHealth sh = new SchedulerHealth();
|
||||||
|
int value = 1;
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
sh.updateSchedulerPreemptionCounts(value);
|
||||||
|
sh.updateSchedulerAllocationCounts(value);
|
||||||
|
sh.updateSchedulerReservationCounts(value);
|
||||||
|
sh.updateSchedulerReleaseCounts(value);
|
||||||
|
|
||||||
|
Assert.assertEquals(value, sh.getAllocationCount().longValue());
|
||||||
|
Assert.assertEquals(value, sh.getReleaseCount().longValue());
|
||||||
|
Assert.assertEquals(value, sh.getReservationCount().longValue());
|
||||||
|
Assert.assertEquals(value, sh.getPreemptionCount().longValue());
|
||||||
|
|
||||||
|
Assert.assertEquals(value * (i + 1), sh.getAggregateAllocationCount()
|
||||||
|
.longValue());
|
||||||
|
Assert.assertEquals(value * (i + 1), sh.getAggregateReleaseCount()
|
||||||
|
.longValue());
|
||||||
|
Assert.assertEquals(value * (i + 1), sh.getAggregateReservationCount()
|
||||||
|
.longValue());
|
||||||
|
Assert.assertEquals(value * (i + 1), sh.getAggregatePreemptionCount()
|
||||||
|
.longValue());
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOperationDetails() {
|
||||||
|
|
||||||
|
SchedulerHealth sh = new SchedulerHealth();
|
||||||
|
long now = Time.now();
|
||||||
|
sh.updateRelease(now, NodeId.newInstance("testhost", 1234),
|
||||||
|
ContainerId.fromString("container_1427562107907_0002_01_000001"),
|
||||||
|
"testqueue");
|
||||||
|
Assert.assertEquals("container_1427562107907_0002_01_000001", sh
|
||||||
|
.getLastReleaseDetails().getContainerId().toString());
|
||||||
|
Assert.assertEquals("testhost:1234", sh.getLastReleaseDetails().getNodeId()
|
||||||
|
.toString());
|
||||||
|
Assert.assertEquals("testqueue", sh.getLastReleaseDetails().getQueue());
|
||||||
|
Assert.assertEquals(now, sh.getLastReleaseDetails().getTimestamp());
|
||||||
|
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
|
||||||
|
|
||||||
|
now = Time.now();
|
||||||
|
sh.updateReservation(now, NodeId.newInstance("testhost1", 1234),
|
||||||
|
ContainerId.fromString("container_1427562107907_0003_01_000001"),
|
||||||
|
"testqueue1");
|
||||||
|
Assert.assertEquals("container_1427562107907_0003_01_000001", sh
|
||||||
|
.getLastReservationDetails().getContainerId().toString());
|
||||||
|
Assert.assertEquals("testhost1:1234", sh.getLastReservationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert
|
||||||
|
.assertEquals("testqueue1", sh.getLastReservationDetails().getQueue());
|
||||||
|
Assert.assertEquals(now, sh.getLastReservationDetails().getTimestamp());
|
||||||
|
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
|
||||||
|
|
||||||
|
now = Time.now();
|
||||||
|
sh.updateAllocation(now, NodeId.newInstance("testhost2", 1234),
|
||||||
|
ContainerId.fromString("container_1427562107907_0004_01_000001"),
|
||||||
|
"testqueue2");
|
||||||
|
Assert.assertEquals("container_1427562107907_0004_01_000001", sh
|
||||||
|
.getLastAllocationDetails().getContainerId().toString());
|
||||||
|
Assert.assertEquals("testhost2:1234", sh.getLastAllocationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("testqueue2", sh.getLastAllocationDetails().getQueue());
|
||||||
|
Assert.assertEquals(now, sh.getLastAllocationDetails().getTimestamp());
|
||||||
|
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
|
||||||
|
|
||||||
|
now = Time.now();
|
||||||
|
sh.updatePreemption(now, NodeId.newInstance("testhost3", 1234),
|
||||||
|
ContainerId.fromString("container_1427562107907_0005_01_000001"),
|
||||||
|
"testqueue3");
|
||||||
|
Assert.assertEquals("container_1427562107907_0005_01_000001", sh
|
||||||
|
.getLastPreemptionDetails().getContainerId().toString());
|
||||||
|
Assert.assertEquals("testhost3:1234", sh.getLastPreemptionDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("testqueue3", sh.getLastPreemptionDetails().getQueue());
|
||||||
|
Assert.assertEquals(now, sh.getLastPreemptionDetails().getTimestamp());
|
||||||
|
Assert.assertEquals(0, sh.getLastSchedulerRunTime());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testResourceUpdate() {
|
||||||
|
SchedulerHealth sh = new SchedulerHealth();
|
||||||
|
long now = Time.now();
|
||||||
|
sh.updateSchedulerRunDetails(now, Resource.newInstance(1024, 1),
|
||||||
|
Resource.newInstance(2048, 1));
|
||||||
|
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
|
||||||
|
Assert.assertEquals(Resource.newInstance(1024, 1),
|
||||||
|
sh.getResourcesAllocated());
|
||||||
|
Assert.assertEquals(Resource.newInstance(2048, 1),
|
||||||
|
sh.getResourcesReserved());
|
||||||
|
now = Time.now();
|
||||||
|
sh.updateSchedulerReleaseDetails(now, Resource.newInstance(3072, 1));
|
||||||
|
Assert.assertEquals(now, sh.getLastSchedulerRunTime());
|
||||||
|
Assert.assertEquals(Resource.newInstance(3072, 1),
|
||||||
|
sh.getResourcesReleased());
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeManager registerNode(String hostName, int containerManagerPort,
|
||||||
|
int httpPort, String rackName, Resource capability) throws IOException,
|
||||||
|
YarnException {
|
||||||
|
NodeManager nm =
|
||||||
|
new NodeManager(hostName, containerManagerPort, httpPort, rackName,
|
||||||
|
capability, resourceManager);
|
||||||
|
NodeAddedSchedulerEvent nodeAddEvent1 =
|
||||||
|
new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|
||||||
|
.get(nm.getNodeId()));
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
||||||
|
return nm;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void nodeUpdate(NodeManager nm) {
|
||||||
|
RMNode node =
|
||||||
|
resourceManager.getRMContext().getRMNodes().get(nm.getNodeId());
|
||||||
|
// Send a heartbeat to kick the tires on the Scheduler
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||||
|
resourceManager.getResourceScheduler().handle(nodeUpdate);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCapacitySchedulerAllocation() throws Exception {
|
||||||
|
|
||||||
|
setup();
|
||||||
|
|
||||||
|
boolean isCapacityScheduler =
|
||||||
|
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
|
||||||
|
assumeTrue("This test is only supported on Capacity Scheduler",
|
||||||
|
isCapacityScheduler);
|
||||||
|
|
||||||
|
// Register node1
|
||||||
|
String host_0 = "host_0";
|
||||||
|
NodeManager nm_0 =
|
||||||
|
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(5 * 1024, 1));
|
||||||
|
|
||||||
|
// ResourceRequest priorities
|
||||||
|
Priority priority_0 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(0);
|
||||||
|
Priority priority_1 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(1);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Application application_0 =
|
||||||
|
new Application("user_0", "default", resourceManager);
|
||||||
|
application_0.submit();
|
||||||
|
|
||||||
|
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||||
|
|
||||||
|
Resource capability_0_0 = Resources.createResource(1024, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||||
|
|
||||||
|
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||||
|
|
||||||
|
Task task_0_0 =
|
||||||
|
new Task(application_0, priority_1, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_0);
|
||||||
|
Task task_0_1 =
|
||||||
|
new Task(application_0, priority_0, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_1);
|
||||||
|
|
||||||
|
// Send resource requests to the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
// Send a heartbeat to kick the tires on the Scheduler
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
SchedulerHealth sh =
|
||||||
|
((CapacityScheduler) resourceManager.getResourceScheduler())
|
||||||
|
.getSchedulerHealth();
|
||||||
|
Assert.assertEquals(2, sh.getAllocationCount().longValue());
|
||||||
|
Assert.assertEquals(Resource.newInstance(3 * 1024, 2),
|
||||||
|
sh.getResourcesAllocated());
|
||||||
|
Assert.assertEquals(2, sh.getAggregateAllocationCount().longValue());
|
||||||
|
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
|
||||||
|
.getQueue());
|
||||||
|
|
||||||
|
Task task_0_2 =
|
||||||
|
new Task(application_0, priority_0, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_2);
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
Assert.assertEquals(1, sh.getAllocationCount().longValue());
|
||||||
|
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
|
||||||
|
sh.getResourcesAllocated());
|
||||||
|
Assert.assertEquals(3, sh.getAggregateAllocationCount().longValue());
|
||||||
|
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
|
||||||
|
.getQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCapacitySchedulerReservation() throws Exception {
|
||||||
|
|
||||||
|
setup();
|
||||||
|
|
||||||
|
boolean isCapacityScheduler =
|
||||||
|
resourceManager.getResourceScheduler() instanceof CapacityScheduler;
|
||||||
|
assumeTrue("This test is only supported on Capacity Scheduler",
|
||||||
|
isCapacityScheduler);
|
||||||
|
|
||||||
|
// Register nodes
|
||||||
|
String host_0 = "host_0";
|
||||||
|
NodeManager nm_0 =
|
||||||
|
registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(2 * 1024, 1));
|
||||||
|
String host_1 = "host_1";
|
||||||
|
NodeManager nm_1 =
|
||||||
|
registerNode(host_1, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
||||||
|
Resources.createResource(5 * 1024, 1));
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
nodeUpdate(nm_1);
|
||||||
|
|
||||||
|
// ResourceRequest priorities
|
||||||
|
Priority priority_0 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(0);
|
||||||
|
Priority priority_1 =
|
||||||
|
org.apache.hadoop.yarn.server.resourcemanager.resource.Priority
|
||||||
|
.create(1);
|
||||||
|
|
||||||
|
// Submit an application
|
||||||
|
Application application_0 =
|
||||||
|
new Application("user_0", "default", resourceManager);
|
||||||
|
application_0.submit();
|
||||||
|
|
||||||
|
application_0.addNodeManager(host_0, 1234, nm_0);
|
||||||
|
application_0.addNodeManager(host_1, 1234, nm_1);
|
||||||
|
|
||||||
|
Resource capability_0_0 = Resources.createResource(1024, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_1, capability_0_0);
|
||||||
|
|
||||||
|
Resource capability_0_1 = Resources.createResource(2 * 1024, 1);
|
||||||
|
application_0.addResourceRequestSpec(priority_0, capability_0_1);
|
||||||
|
|
||||||
|
Task task_0_0 =
|
||||||
|
new Task(application_0, priority_1, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_0);
|
||||||
|
|
||||||
|
// Send resource requests to the scheduler
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
// Send a heartbeat to kick the tires on the Scheduler
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
SchedulerHealth sh =
|
||||||
|
((CapacityScheduler) resourceManager.getResourceScheduler())
|
||||||
|
.getSchedulerHealth();
|
||||||
|
Assert.assertEquals(1, sh.getAllocationCount().longValue());
|
||||||
|
Assert.assertEquals(Resource.newInstance(1024, 1),
|
||||||
|
sh.getResourcesAllocated());
|
||||||
|
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
|
||||||
|
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
|
||||||
|
.getQueue());
|
||||||
|
|
||||||
|
Task task_0_1 =
|
||||||
|
new Task(application_0, priority_0, new String[] { host_0 });
|
||||||
|
application_0.addTask(task_0_1);
|
||||||
|
application_0.schedule();
|
||||||
|
|
||||||
|
nodeUpdate(nm_0);
|
||||||
|
Assert.assertEquals(0, sh.getAllocationCount().longValue());
|
||||||
|
Assert.assertEquals(1, sh.getReservationCount().longValue());
|
||||||
|
Assert.assertEquals(Resource.newInstance(2 * 1024, 1),
|
||||||
|
sh.getResourcesReserved());
|
||||||
|
Assert.assertEquals(1, sh.getAggregateAllocationCount().longValue());
|
||||||
|
Assert.assertEquals("host_0:1234", sh.getLastAllocationDetails()
|
||||||
|
.getNodeId().toString());
|
||||||
|
Assert.assertEquals("root.default", sh.getLastAllocationDetails()
|
||||||
|
.getQueue());
|
||||||
|
}
|
||||||
|
}
|
|
@ -1660,7 +1660,7 @@ public class TestCapacityScheduler {
|
||||||
CapacityScheduler cs =
|
CapacityScheduler cs =
|
||||||
(CapacityScheduler) resourceManager.getResourceScheduler();
|
(CapacityScheduler) resourceManager.getResourceScheduler();
|
||||||
CSQueue origRootQ = cs.getRootQueue();
|
CSQueue origRootQ = cs.getRootQueue();
|
||||||
CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ);
|
CapacitySchedulerInfo oldInfo = new CapacitySchedulerInfo(origRootQ, cs);
|
||||||
int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
|
int origNumAppsA = getNumAppsInQueue("a", origRootQ.getChildQueues());
|
||||||
int origNumAppsRoot = origRootQ.getNumApplications();
|
int origNumAppsRoot = origRootQ.getNumApplications();
|
||||||
|
|
||||||
|
@ -1669,7 +1669,7 @@ public class TestCapacityScheduler {
|
||||||
CSQueue newRootQ = cs.getRootQueue();
|
CSQueue newRootQ = cs.getRootQueue();
|
||||||
int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
|
int newNumAppsA = getNumAppsInQueue("a", newRootQ.getChildQueues());
|
||||||
int newNumAppsRoot = newRootQ.getNumApplications();
|
int newNumAppsRoot = newRootQ.getNumApplications();
|
||||||
CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ);
|
CapacitySchedulerInfo newInfo = new CapacitySchedulerInfo(newRootQ, cs);
|
||||||
CapacitySchedulerLeafQueueInfo origOldA1 =
|
CapacitySchedulerLeafQueueInfo origOldA1 =
|
||||||
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
|
(CapacitySchedulerLeafQueueInfo) getQueueInfo("a1", oldInfo.getQueues());
|
||||||
CapacitySchedulerLeafQueueInfo origNewA1 =
|
CapacitySchedulerLeafQueueInfo origNewA1 =
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
|
@ -314,11 +315,14 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
||||||
JSONObject info = json.getJSONObject("scheduler");
|
JSONObject info = json.getJSONObject("scheduler");
|
||||||
assertEquals("incorrect number of elements", 1, info.length());
|
assertEquals("incorrect number of elements", 1, info.length());
|
||||||
info = info.getJSONObject("schedulerInfo");
|
info = info.getJSONObject("schedulerInfo");
|
||||||
assertEquals("incorrect number of elements", 6, info.length());
|
assertEquals("incorrect number of elements", 7, info.length());
|
||||||
verifyClusterSchedulerGeneric(info.getString("type"),
|
verifyClusterSchedulerGeneric(info.getString("type"),
|
||||||
(float) info.getDouble("usedCapacity"),
|
(float) info.getDouble("usedCapacity"),
|
||||||
(float) info.getDouble("capacity"),
|
(float) info.getDouble("capacity"),
|
||||||
(float) info.getDouble("maxCapacity"), info.getString("queueName"));
|
(float) info.getDouble("maxCapacity"), info.getString("queueName"));
|
||||||
|
JSONObject health = info.getJSONObject("health");
|
||||||
|
assertNotNull(health);
|
||||||
|
assertEquals("incorrect number of elements", 3, health.length());
|
||||||
|
|
||||||
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
|
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
|
||||||
assertEquals("incorrect number of elements", 2, arr.length());
|
assertEquals("incorrect number of elements", 2, arr.length());
|
||||||
|
|
Loading…
Reference in New Issue