Merge -r 1177858:1177859 from trunk to branch-0.23 to fix MAPREDUCE-3050.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1177861 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1efe4e403d
commit
4854989c46
|
@ -1455,6 +1455,9 @@ Release 0.23.0 - Unreleased
|
|||
MAPREDUCE-2996. Add uber-ness information to JobHistory. (Jonathan Eagles
|
||||
via acmurthy)
|
||||
|
||||
MAPREDUCE-3050. Add ability to get resource usage information for
|
||||
applications and nodes. (Robert Evans via acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -351,7 +351,7 @@ public class ClientRMService extends AbstractService implements
|
|||
report.setNodeHealthStatus(rmNode.getNodeHealthStatus());
|
||||
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport schedulerNodeReport = scheduler
|
||||
.getNodeReport(rmNode.getNodeID());
|
||||
report.setUsed(schedulerNodeReport.getUsedResources());
|
||||
report.setUsed(schedulerNodeReport.getUsedResource());
|
||||
report.setNumContainers(schedulerNodeReport.getNumContainers());
|
||||
return report;
|
||||
}
|
||||
|
|
|
@ -136,6 +136,10 @@ public class SchedulerApp {
|
|||
return this.appSchedulingInfo.getResource(priority);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this application pending?
|
||||
* @return true if it is else false.
|
||||
*/
|
||||
public boolean isPending() {
|
||||
return this.appSchedulingInfo.isPending();
|
||||
}
|
||||
|
@ -144,6 +148,10 @@ public class SchedulerApp {
|
|||
return this.appSchedulingInfo.getQueueName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of live containers
|
||||
* @return All of the live containers
|
||||
*/
|
||||
public synchronized Collection<RMContainer> getLiveContainers() {
|
||||
return new ArrayList<RMContainer>(liveContainers.values());
|
||||
}
|
||||
|
@ -419,7 +427,11 @@ public class SchedulerApp {
|
|||
return Math.min(((float)requiredResources / clusterNodes), 1.0f);
|
||||
}
|
||||
|
||||
public synchronized List<RMContainer> getAllReservedContainers() {
|
||||
/**
|
||||
* Get the list of reserved containers
|
||||
* @return All of the reserved containers.
|
||||
*/
|
||||
public synchronized List<RMContainer> getReservedContainers() {
|
||||
List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
|
||||
for (Map.Entry<Priority, Map<NodeId, RMContainer>> e :
|
||||
this.reservedContainers.entrySet()) {
|
||||
|
@ -447,5 +459,4 @@ public class SchedulerApp {
|
|||
public Queue getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
|
||||
/**
|
||||
* Represents an application attempt, and the resources that the attempt is
|
||||
* using.
|
||||
*/
|
||||
@Evolving
|
||||
@LimitedPrivate("yarn")
|
||||
public class SchedulerAppReport {
|
||||
|
||||
private final Collection<RMContainer> live;
|
||||
private final Collection<RMContainer> reserved;
|
||||
private final boolean pending;
|
||||
|
||||
public SchedulerAppReport(SchedulerApp app) {
|
||||
this.live = app.getLiveContainers();
|
||||
this.reserved = app.getReservedContainers();
|
||||
this.pending = app.isPending();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of live containers
|
||||
* @return All of the live containers
|
||||
*/
|
||||
public Collection<RMContainer> getLiveContainers() {
|
||||
return live;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the list of reserved containers
|
||||
* @return All of the reserved containers.
|
||||
*/
|
||||
public Collection<RMContainer> getReservedContainers() {
|
||||
return reserved;
|
||||
}
|
||||
|
||||
/**
|
||||
* Is this application pending?
|
||||
* @return true if it is else false.
|
||||
*/
|
||||
public boolean isPending() {
|
||||
return pending;
|
||||
}
|
||||
}
|
|
@ -28,19 +28,34 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
@Private
|
||||
@Stable
|
||||
public class SchedulerNodeReport {
|
||||
private final Resource usedResources;
|
||||
private final int numContainers;
|
||||
private final Resource used;
|
||||
private final Resource avail;
|
||||
private final int num;
|
||||
|
||||
public SchedulerNodeReport(Resource used, int numContainers) {
|
||||
this.usedResources = used;
|
||||
this.numContainers = numContainers;
|
||||
public SchedulerNodeReport(SchedulerNode node) {
|
||||
this.used = node.getUsedResource();
|
||||
this.avail = node.getAvailableResource();
|
||||
this.num = node.getNumContainers();
|
||||
}
|
||||
|
||||
public Resource getUsedResources() {
|
||||
return usedResources;
|
||||
/**
|
||||
* @return the amount of resources currently used by the node.
|
||||
*/
|
||||
public Resource getUsedResource() {
|
||||
return used;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the amount of resources currently available on the node
|
||||
*/
|
||||
public Resource getAvailableResource() {
|
||||
return avail;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of containers currently running on this node.
|
||||
*/
|
||||
public int getNumContainers() {
|
||||
return numContainers;
|
||||
return num;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -57,7 +58,6 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
/**
|
||||
* Get acls for queues for current user.
|
||||
* @return acls for queues for current user
|
||||
* @throws IOException
|
||||
*/
|
||||
@Public
|
||||
@Stable
|
||||
|
@ -101,26 +101,24 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
|
|||
* @param nodeId
|
||||
* @return the {@link SchedulerNodeReport} for the node
|
||||
*/
|
||||
@Private
|
||||
@LimitedPrivate("yarn")
|
||||
@Stable
|
||||
public SchedulerNodeReport getNodeReport(NodeId nodeId);
|
||||
|
||||
/**
|
||||
* Get used resources on the node
|
||||
* @param nodeId node
|
||||
* @return used resources on the node
|
||||
* Get the Scheduler app for a given app attempt Id.
|
||||
* @param appAttemptId the id of the application attempt
|
||||
* @return SchedulerApp for this given attempt.
|
||||
*/
|
||||
@Private
|
||||
@LimitedPrivate("yarn")
|
||||
@Stable
|
||||
Resource getUsedResource(NodeId nodeId);
|
||||
SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId appAttemptId);
|
||||
|
||||
/**
|
||||
* Get available resources on the node
|
||||
* @param nodeId node
|
||||
* @return available resources on the node
|
||||
* Get the root queue for the scheduler.
|
||||
* @return the root queue for the scheduler.
|
||||
*/
|
||||
@Private
|
||||
@Stable
|
||||
Resource getAvailableResource(NodeId nodeId);
|
||||
|
||||
@LimitedPrivate("yarn")
|
||||
@Evolving
|
||||
QueueMetrics getRootQueueMetrics();
|
||||
}
|
||||
|
|
|
@ -58,8 +58,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
|
@ -128,6 +130,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
|
||||
public CapacityScheduler() {}
|
||||
|
||||
@Override
|
||||
public QueueMetrics getRootQueueMetrics() {
|
||||
return root.getMetrics();
|
||||
}
|
||||
|
||||
public CSQueue getRootQueue() {
|
||||
return root;
|
||||
}
|
||||
|
@ -152,14 +159,6 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
return maximumAllocation;
|
||||
}
|
||||
|
||||
public synchronized Resource getUsedResource(NodeId nodeId) {
|
||||
return nodes.get(nodeId).getUsedResource();
|
||||
}
|
||||
|
||||
public synchronized Resource getAvailableResource(NodeId nodeId) {
|
||||
return nodes.get(nodeId).getAvailableResource();
|
||||
}
|
||||
|
||||
public synchronized int getNumClusterNodes() {
|
||||
return numNodeManagers;
|
||||
}
|
||||
|
@ -401,7 +400,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
}
|
||||
|
||||
// Release all reserved containers
|
||||
for (RMContainer rmContainer : application.getAllReservedContainers()) {
|
||||
for (RMContainer rmContainer : application.getReservedContainers()) {
|
||||
completedContainer(rmContainer,
|
||||
SchedulerUtils.createAbnormalContainerStatus(
|
||||
rmContainer.getContainerId(),
|
||||
|
@ -733,6 +732,13 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
return applications.get(applicationAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchedulerAppReport getSchedulerAppInfo(
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
SchedulerApp app = getApplication(applicationAttemptId);
|
||||
return app == null ? null : new SchedulerAppReport(app);
|
||||
}
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
SchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
|
@ -764,8 +770,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|||
@Override
|
||||
public SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
||||
SchedulerNode node = getNode(nodeId);
|
||||
return new SchedulerNodeReport(
|
||||
node.getUsedResource(), node.getNumContainers());
|
||||
return node == null ? null : new SchedulerNodeReport(node);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1052,6 +1052,7 @@ public class LeafQueue implements CSQueue {
|
|||
createContainer(application, node, capability, priority);
|
||||
}
|
||||
|
||||
|
||||
public Container createContainer(SchedulerApp application, SchedulerNode node,
|
||||
Resource capability, Priority priority) {
|
||||
Container container =
|
||||
|
|
|
@ -141,7 +141,7 @@ public class ParentQueue implements CSQueue {
|
|||
maximumCapacity, absoluteMaxCapacity, state, acls);
|
||||
|
||||
this.queueComparator = comparator;
|
||||
this.childQueues = new TreeSet<CSQueue>(comparator);
|
||||
this.childQueues = new TreeSet<CSQueue>(queueComparator);
|
||||
|
||||
LOG.info("Initialized parent-queue " + queueName +
|
||||
" name=" + queueName +
|
||||
|
|
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
|
@ -285,6 +286,13 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
return applications.get(applicationAttemptId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SchedulerAppReport getSchedulerAppInfo(
|
||||
ApplicationAttemptId applicationAttemptId) {
|
||||
SchedulerApp app = getApplication(applicationAttemptId);
|
||||
return app == null ? null : new SchedulerAppReport(app);
|
||||
}
|
||||
|
||||
private SchedulerNode getNode(NodeId nodeId) {
|
||||
return nodes.get(nodeId);
|
||||
}
|
||||
|
@ -762,8 +770,7 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
@Override
|
||||
public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
|
||||
SchedulerNode node = getNode(nodeId);
|
||||
return new SchedulerNodeReport(
|
||||
node.getUsedResource(), node.getNumContainers());
|
||||
return node == null ? null : new SchedulerNodeReport(node);
|
||||
}
|
||||
|
||||
private RMContainer getRMContainer(ContainerId containerId) {
|
||||
|
@ -772,4 +779,9 @@ public class FifoScheduler implements ResourceScheduler {
|
|||
return (application == null) ? null : application.getRMContainer(containerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueueMetrics getRootQueueMetrics() {
|
||||
return DEFAULT_QUEUE.getMetrics();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -28,10 +28,12 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
@ -71,8 +73,9 @@ public class TestFifoScheduler {
|
|||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
|
||||
nm1.getNodeId());
|
||||
Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemory());
|
||||
|
||||
RMApp app2 = rm.submitApp(2048);
|
||||
// kick the scheduling, 2GB given to AM, remaining 2 GB on nm2
|
||||
|
@ -80,8 +83,9 @@ public class TestFifoScheduler {
|
|||
RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
|
||||
MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
|
||||
am2.registerAppAttempt();
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm2.getNodeId()).getMemory());
|
||||
SchedulerNodeReport report_nm2 = rm.getResourceScheduler().getNodeReport(
|
||||
nm2.getNodeId());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory());
|
||||
|
||||
// add request for containers
|
||||
am1.addRequests(new String[] { "h1", "h2" }, GB, 1, 1);
|
||||
|
@ -115,15 +119,13 @@ public class TestFifoScheduler {
|
|||
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
|
||||
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());
|
||||
|
||||
Assert.assertEquals(0, rm.getResourceScheduler().getAvailableResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler()
|
||||
.getAvailableResource(nm2.getNodeId()).getMemory());
|
||||
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
report_nm2 = rm.getResourceScheduler().getNodeReport(nm2.getNodeId());
|
||||
Assert.assertEquals(0, report_nm1.getAvailableResource().getMemory());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getAvailableResource().getMemory());
|
||||
|
||||
Assert.assertEquals(6 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
Assert.assertEquals(2 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm2.getNodeId()).getMemory());
|
||||
Assert.assertEquals(6 * GB, report_nm1.getUsedResource().getMemory());
|
||||
Assert.assertEquals(2 * GB, report_nm2.getUsedResource().getMemory());
|
||||
|
||||
Container c1 = allocated1.get(0);
|
||||
Assert.assertEquals(GB, c1.getResource().getMemory());
|
||||
|
@ -138,8 +140,8 @@ public class TestFifoScheduler {
|
|||
}
|
||||
Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
|
||||
Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
|
||||
Assert.assertEquals(5 * GB, rm.getResourceScheduler().getUsedResource(
|
||||
nm1.getNodeId()).getMemory());
|
||||
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
|
||||
Assert.assertEquals(5 * GB, report_nm1.getUsedResource().getMemory());
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue