YARN-5047. Refactor nodeUpdate across schedulers. (Ray Chiang via kasha)

(cherry picked from commit 754cb4e30f)
This commit is contained in:
Karthik Kambatla 2016-10-20 21:17:48 -07:00
parent c2cabce2e8
commit 440a6326d1
5 changed files with 225 additions and 261 deletions

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -73,7 +74,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReco
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.SettableFuture;
@ -94,10 +100,14 @@ public abstract class AbstractYarnScheduler
protected Resource minimumAllocation;
protected volatile RMContext rmContext;
private volatile Priority maxClusterLevelAppPriority;
protected ActivitiesManager activitiesManager;
protected SchedulerHealth schedulerHealth = new SchedulerHealth();
protected volatile long lastNodeUpdateTime;
private volatile Clock clock;
/*
* All schedulers which are inheriting AbstractYarnScheduler should use
@ -130,6 +140,7 @@ public abstract class AbstractYarnScheduler
*/
public AbstractYarnScheduler(String name) {
super(name);
clock = SystemClock.getInstance();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -228,13 +239,25 @@ public abstract class AbstractYarnScheduler
nodeTracker.setConfiguredMaxAllocation(maximumAllocation);
}
public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth;
}
protected void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
protected void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
try {
readLock.lock();
// Get the application for the finished container
SchedulerApplicationAttempt application = getCurrentAttemptForContainer(
containerId);
SchedulerApplicationAttempt application =
getCurrentAttemptForContainer(containerId);
if (application == null) {
LOG.info("Unknown application " + containerId.getApplicationAttemptId()
.getApplicationId() + " launched container " + containerId
@ -249,7 +272,7 @@ public abstract class AbstractYarnScheduler
readLock.unlock();
}
}
protected void containerIncreasedOnNode(ContainerId containerId,
SchedulerNode node, Container increasedContainerReportedByNM) {
/*
@ -276,6 +299,7 @@ public abstract class AbstractYarnScheduler
}
rmContainer.handle(new RMContainerNMDoneChangeResourceEvent(containerId,
increasedContainerReportedByNM.getResource()));
}
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
@ -360,7 +384,7 @@ public abstract class AbstractYarnScheduler
}
}
public void recoverContainersOnNode(
public synchronized void recoverContainersOnNode(
List<NMContainerStatus> containerReports, RMNode nm) {
try {
writeLock.lock();
@ -475,7 +499,7 @@ public abstract class AbstractYarnScheduler
}
/**
* Recover resource request back from RMContainer when a container is
* Recover resource request back from RMContainer when a container is
* preempted before AM pulled the same. If container is pulled by
* AM, then RMContainer will not have resource request to recover.
* @param rmContainer rmContainer
@ -621,7 +645,7 @@ public abstract class AbstractYarnScheduler
SchedulerApplicationAttempt attempt);
@Override
public SchedulerNode getSchedulerNode(NodeId nodeId) {
public N getSchedulerNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
@ -832,4 +856,152 @@ public abstract class AbstractYarnScheduler
return this.activitiesManager;
}
public Clock getClock() {
return clock;
}
@VisibleForTesting
public void setClock(Clock clock) {
this.clock = clock;
}
@Lock(Lock.NoLock.class)
public SchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
/**
* Get lists of new containers from NodeManager and process them.
* @param nm The RMNode corresponding to the NodeManager
* @return list of completed containers
*/
protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
SchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<>();
List<ContainerStatus> completedContainers =
new ArrayList<>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers
.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Processing the newly increased containers
List<Container> newlyIncreasedContainers =
nm.pullNewlyIncreasedContainers();
for (Container container : newlyIncreasedContainers) {
containerIncreasedOnNode(container.getId(), node, container);
}
return completedContainers;
}
/**
* Process completed container list.
* @param completedContainers Extracted list of completed containers
* @param releasedResources Reference resource object for completed containers
* @return The total number of released containers
*/
protected int updateCompletedContainers(List<ContainerStatus>
completedContainers, Resource releasedResources) {
int releasedContainers = 0;
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
RMContainer container = getRMContainer(containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
if (container != null) {
releasedContainers++;
Resource ars = container.getAllocatedResource();
if (ars != null) {
Resources.addTo(releasedResources, ars);
}
Resource rrs = container.getReservedResource();
if (rrs != null) {
Resources.addTo(releasedResources, rrs);
}
}
}
return releasedContainers;
}
/**
* Update schedulerHealth information.
* @param releasedResources Reference resource object for completed containers
* @param releasedContainers Count of released containers
*/
protected void updateSchedulerHealthInformation(Resource releasedResources,
int releasedContainers) {
schedulerHealth.updateSchedulerReleaseDetails(getLastNodeUpdateTime(),
releasedResources);
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
}
/**
* Update container and utilization information on the NodeManager.
* @param nm The NodeManager to update
*/
protected void updateNodeResourceUtilization(RMNode nm) {
SchedulerNode node = getNode(nm.getNodeID());
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
}
/**
* Process a heartbeat update from a node.
* @param nm The RMNode corresponding to the NodeManager
*/
protected synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm +
" cluster capacity: " + getClusterResource());
}
// Process new container information
List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
// Process completed containers
Resource releasedResources = Resource.newInstance(0, 0);
int releasedContainers = updateCompletedContainers(completedContainers,
releasedResources);
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
// TODO YARN-5128: Fix possible race-condition when request comes in before
// update is propagated
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(nm.getNodeID())
.getAllocatedResource(), 0)));
}
updateSchedulerHealthInformation(releasedResources, releasedContainers);
updateNodeResourceUtilization(nm);
// Now node data structures are up-to-date and ready for scheduling.
if(LOG.isDebugEnabled()) {
SchedulerNode node = getNode(nm.getNodeID());
LOG.debug("Node being looked for scheduling " + nm +
" availableResource: " + node.getUnallocatedResource());
}
}
}

View File

@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -89,8 +88,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@ -105,7 +102,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
@ -236,8 +232,6 @@ public class CapacityScheduler extends
private boolean scheduleAsynchronously;
private AsyncScheduleThread asyncSchedulerThread;
private RMNodeLabelsManager labelManager;
private SchedulerHealth schedulerHealth = new SchedulerHealth();
volatile long lastNodeUpdateTime;
/**
* EXPERT
@ -1100,93 +1094,24 @@ public class CapacityScheduler extends
return root.getQueueUserAclInfo(user);
}
private void nodeUpdate(RMNode nm) {
@Override
protected synchronized void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeUpdate: " + nm + " clusterResources: " + getClusterResource());
}
Resource releaseResources = Resource.newInstance(0, 0);
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(
containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Processing the newly increased containers
List<Container> newlyIncreasedContainers =
nm.pullNewlyIncreasedContainers();
for (Container container : newlyIncreasedContainers) {
containerIncreasedOnNode(container.getId(), node, container);
}
// Process completed containers
int releasedContainers = 0;
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
RMContainer container = getRMContainer(containerId);
super.completedContainer(container, completedContainer,
RMContainerEventType.FINISHED);
if (container != null) {
releasedContainers++;
Resource rs = container.getAllocatedResource();
if (rs != null) {
Resources.addTo(releaseResources, rs);
}
rs = container.getReservedResource();
if (rs != null) {
Resources.addTo(releaseResources, rs);
}
}
}
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
// TODO: Fix possible race-condition when request comes in before
// update is propagated
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(
getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
0)));
}
schedulerHealth.updateSchedulerReleaseDetails(lastNodeUpdateTime,
releaseResources);
schedulerHealth.updateSchedulerReleaseCounts(releasedContainers);
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
// Now node data structures are upto date and ready for scheduling.
if (LOG.isDebugEnabled()) {
LOG.debug(
"Node being looked for scheduling " + nm + " availableResource: "
+ node.getUnallocatedResource());
setLastNodeUpdateTime(Time.now());
super.nodeUpdate(nm);
if (!scheduleAsynchronously) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
nm.getNodeID());
allocateContainersToNode(getNode(nm.getNodeID()));
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
nm.getNodeID());
}
} finally {
writeLock.unlock();
}
}
/**
* Process resource update on a node.
*/
@ -1459,16 +1384,7 @@ public class CapacityScheduler extends
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
setLastNodeUpdateTime(Time.now());
nodeUpdate(node);
if (!scheduleAsynchronously) {
ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
node.getNodeID());
allocateContainersToNode(getNode(node.getNodeID()));
ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
node.getNodeID());
}
nodeUpdate(nodeUpdatedEvent.getRMNode());
}
break;
case APP_ADDED:
@ -2194,20 +2110,6 @@ public class CapacityScheduler extends
return ret;
}
@Override
public SchedulerHealth getSchedulerHealth() {
return this.schedulerHealth;
}
private void setLastNodeUpdateTime(long time) {
this.lastNodeUpdateTime = time;
}
@Override
public long getLastNodeUpdateTime() {
return lastNodeUpdateTime;
}
@Override
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -70,8 +69,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -92,8 +89,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -130,7 +125,6 @@ public class FairScheduler extends
private Resource incrAllocation;
private QueueManager queueMgr;
private volatile Clock clock;
private boolean usePortForNodeName;
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
@ -217,7 +211,6 @@ public class FairScheduler extends
public FairScheduler() {
super(FairScheduler.class.getName());
clock = SystemClock.getInstance();
allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
@ -383,7 +376,7 @@ public class FairScheduler extends
* threshold for each type of task.
*/
private void updateStarvationStats() {
lastPreemptionUpdateTime = clock.getTime();
lastPreemptionUpdateTime = getClock().getTime();
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
sched.updateStarvationStats();
}
@ -616,15 +609,6 @@ public class FairScheduler extends
return continuousSchedulingSleepMs;
}
public Clock getClock() {
return clock;
}
@VisibleForTesting
void setClock(Clock clock) {
this.clock = clock;
}
public FairSchedulerEventLog getEventLog() {
return eventLog;
}
@ -1053,67 +1037,17 @@ public class FairScheduler extends
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens());
}
/**
* Process a heartbeat update from a node.
*/
private void nodeUpdate(RMNode nm) {
@Override
protected synchronized void nodeUpdate(RMNode nm) {
try {
writeLock.lock();
long start = getClock().getTime();
if (LOG.isDebugEnabled()) {
LOG.debug(
"nodeUpdate: " + nm + " cluster capacity: " + getClusterResource());
}
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
super.nodeUpdate(nm);
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(
containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
if (nm.getState() == NodeState.DECOMMISSIONING) {
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
.newInstance(
getSchedulerNode(nm.getNodeID()).getAllocatedResource(),
0)));
}
if (continuousSchedulingEnabled) {
if (!completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else{
attemptScheduling(node);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
nm.getAggregatedContainersUtilization());
node.setNodeUtilization(nm.getNodeUtilization());
FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID());
attemptScheduling(fsNode);
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);

View File

@ -42,14 +42,12 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -69,8 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -385,10 +381,6 @@ public class FifoScheduler extends
}
}
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodeTracker.getNode(nodeId);
}
@VisibleForTesting
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
@ -733,66 +725,6 @@ public class FifoScheduler extends
return assignedContainers;
}
private synchronized void nodeUpdate(RMNode rmNode) {
FiCaSchedulerNode node = getNode(rmNode.getNodeID());
List<UpdatedContainerInfo> containerInfoList = rmNode.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
super.completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Updating node resource utilization
node.setAggregatedContainersUtilization(
rmNode.getAggregatedContainersUtilization());
node.setNodeUtilization(rmNode.getNodeUtilization());
// If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to
// schedule.
if (rmNode.getState() == NodeState.DECOMMISSIONING) {
this.rmContext
.getDispatcher()
.getEventHandler()
.handle(
new RMNodeResourceUpdateEvent(rmNode.getNodeID(), ResourceOption
.newInstance(getSchedulerNode(rmNode.getNodeID())
.getAllocatedResource(), 0)));
}
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + rmNode.getNodeID() +
" available resource = " + node.getUnallocatedResource());
assignContainers(node);
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getUnallocatedResource());
}
updateAvailableResourcesMetrics();
}
private void increaseUsedResources(RMContainer rmContainer) {
Resources.addTo(usedResource, rmContainer.getAllocatedResource());
}
@ -910,7 +842,7 @@ public class FifoScheduler extends
container.getId().getApplicationAttemptId().getApplicationId();
// Get the node on which the container was allocated
FiCaSchedulerNode node = getNode(container.getNodeId());
FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(container.getNodeId());
if (application == null) {
LOG.info("Unknown application: " + appId +
@ -1025,4 +957,28 @@ public class FifoScheduler extends
// TODO Auto-generated method stub
}
@Override
protected synchronized void nodeUpdate(RMNode nm) {
super.nodeUpdate(nm);
FiCaSchedulerNode node = (FiCaSchedulerNode) getNode(nm.getNodeID());
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
if (Resources.greaterThanOrEqual(resourceCalculator, getClusterResource(),
node.getUnallocatedResource(), minimumAllocation)) {
LOG.debug("Node heartbeat " + nm.getNodeID() +
" available resource = " + node.getUnallocatedResource());
assignContainers(node);
LOG.debug("Node after allocation " + nm.getNodeID() + " resource = "
+ node.getUnallocatedResource());
}
updateAvailableResourcesMetrics();
}
}

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@ -1061,7 +1061,7 @@ public class TestProportionalCapacityPreemptionPolicy {
when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
clusterResources);
SchedulerNode mNode = mock(SchedulerNode.class);
FiCaSchedulerNode mNode = mock(FiCaSchedulerNode.class);
when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
}