YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused a task timeout for 30mins. (Sunil G via mayank)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1610884 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
32d9fd2ca3
commit
8260d48df9
|
@ -244,6 +244,9 @@ Release 2.5.0 - UNRELEASED
|
||||||
YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already
|
YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes already
|
||||||
exist. (Robert Kanter via kasha)
|
exist. (Robert Kanter via kasha)
|
||||||
|
|
||||||
|
YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused
|
||||||
|
a task timeout for 30mins. (Sunil G via mayank)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -73,5 +76,7 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
|
||||||
ContainerReport createContainerReport();
|
ContainerReport createContainerReport();
|
||||||
|
|
||||||
boolean isAMContainer();
|
boolean isAMContainer();
|
||||||
|
|
||||||
|
List<ResourceRequest> getResourceRequests();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
|
||||||
|
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -158,6 +160,7 @@ public class RMContainerImpl implements RMContainer {
|
||||||
private long finishTime;
|
private long finishTime;
|
||||||
private ContainerStatus finishedStatus;
|
private ContainerStatus finishedStatus;
|
||||||
private boolean isAMContainer;
|
private boolean isAMContainer;
|
||||||
|
private List<ResourceRequest> resourceRequests;
|
||||||
|
|
||||||
public RMContainerImpl(Container container,
|
public RMContainerImpl(Container container,
|
||||||
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
|
||||||
|
@ -180,7 +183,8 @@ public class RMContainerImpl implements RMContainer {
|
||||||
this.eventHandler = rmContext.getDispatcher().getEventHandler();
|
this.eventHandler = rmContext.getDispatcher().getEventHandler();
|
||||||
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
|
this.containerAllocationExpirer = rmContext.getContainerAllocationExpirer();
|
||||||
this.isAMContainer = false;
|
this.isAMContainer = false;
|
||||||
|
this.resourceRequests = null;
|
||||||
|
|
||||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||||
this.readLock = lock.readLock();
|
this.readLock = lock.readLock();
|
||||||
this.writeLock = lock.writeLock();
|
this.writeLock = lock.writeLock();
|
||||||
|
@ -311,6 +315,25 @@ public class RMContainerImpl implements RMContainer {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ResourceRequest> getResourceRequests() {
|
||||||
|
try {
|
||||||
|
readLock.lock();
|
||||||
|
return resourceRequests;
|
||||||
|
} finally {
|
||||||
|
readLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setResourceRequests(List<ResourceRequest> requests) {
|
||||||
|
try {
|
||||||
|
writeLock.lock();
|
||||||
|
this.resourceRequests = requests;
|
||||||
|
} finally {
|
||||||
|
writeLock.unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
|
@ -432,6 +455,9 @@ public class RMContainerImpl implements RMContainer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
public void transition(RMContainerImpl container, RMContainerEvent event) {
|
||||||
|
// Clear ResourceRequest stored in RMContainer
|
||||||
|
container.setResourceRequests(null);
|
||||||
|
|
||||||
// Register with containerAllocationExpirer.
|
// Register with containerAllocationExpirer.
|
||||||
container.containerAllocationExpirer.register(container.getContainerId());
|
container.containerAllocationExpirer.register(container.getContainerId());
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -275,6 +276,27 @@ public abstract class AbstractYarnScheduler
|
||||||
return rmContainer;
|
return rmContainer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Recover resource request back from RMContainer when a container is
|
||||||
|
* preempted before AM pulled the same. If container is pulled by
|
||||||
|
* AM, then RMContainer will not have resource request to recover.
|
||||||
|
* @param rmContainer
|
||||||
|
*/
|
||||||
|
protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
|
||||||
|
List<ResourceRequest> requests = rmContainer.getResourceRequests();
|
||||||
|
|
||||||
|
// If container state is moved to ACQUIRED, request will be empty.
|
||||||
|
if (requests == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Add resource request back to Scheduler.
|
||||||
|
SchedulerApplicationAttempt schedulerAttempt
|
||||||
|
= getCurrentAttemptForContainer(rmContainer.getContainerId());
|
||||||
|
if (schedulerAttempt != null) {
|
||||||
|
schedulerAttempt.recoverResourceRequests(requests);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
public SchedulerNode getSchedulerNode(NodeId nodeId) {
|
||||||
return nodes.get(nodeId);
|
return nodes.get(nodeId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,9 +127,10 @@ public class AppSchedulingInfo {
|
||||||
* by the application.
|
* by the application.
|
||||||
*
|
*
|
||||||
* @param requests resources to be acquired
|
* @param requests resources to be acquired
|
||||||
|
* @param recoverPreemptedRequest recover Resource Request on preemption
|
||||||
*/
|
*/
|
||||||
synchronized public void updateResourceRequests(
|
synchronized public void updateResourceRequests(
|
||||||
List<ResourceRequest> requests) {
|
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
|
|
||||||
// Update resource requests
|
// Update resource requests
|
||||||
|
@ -163,8 +164,13 @@ public class AppSchedulingInfo {
|
||||||
asks = new HashMap<String, ResourceRequest>();
|
asks = new HashMap<String, ResourceRequest>();
|
||||||
this.requests.put(priority, asks);
|
this.requests.put(priority, asks);
|
||||||
this.priorities.add(priority);
|
this.priorities.add(priority);
|
||||||
} else if (updatePendingResources) {
|
}
|
||||||
lastRequest = asks.get(resourceName);
|
lastRequest = asks.get(resourceName);
|
||||||
|
|
||||||
|
if (recoverPreemptedRequest && lastRequest != null) {
|
||||||
|
// Increment the number of containers to 1, as it is recovering a
|
||||||
|
// single container.
|
||||||
|
request.setNumContainers(lastRequest.getNumContainers() + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
asks.put(resourceName, request);
|
asks.put(resourceName, request);
|
||||||
|
@ -254,14 +260,16 @@ public class AppSchedulingInfo {
|
||||||
* @param container
|
* @param container
|
||||||
* the containers allocated.
|
* the containers allocated.
|
||||||
*/
|
*/
|
||||||
synchronized public void allocate(NodeType type, SchedulerNode node,
|
synchronized public List<ResourceRequest> allocate(NodeType type,
|
||||||
Priority priority, ResourceRequest request, Container container) {
|
SchedulerNode node, Priority priority, ResourceRequest request,
|
||||||
|
Container container) {
|
||||||
|
List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
|
||||||
if (type == NodeType.NODE_LOCAL) {
|
if (type == NodeType.NODE_LOCAL) {
|
||||||
allocateNodeLocal(node, priority, request, container);
|
allocateNodeLocal(node, priority, request, container, resourceRequests);
|
||||||
} else if (type == NodeType.RACK_LOCAL) {
|
} else if (type == NodeType.RACK_LOCAL) {
|
||||||
allocateRackLocal(node, priority, request, container);
|
allocateRackLocal(node, priority, request, container, resourceRequests);
|
||||||
} else {
|
} else {
|
||||||
allocateOffSwitch(node, priority, request, container);
|
allocateOffSwitch(node, priority, request, container, resourceRequests);
|
||||||
}
|
}
|
||||||
QueueMetrics metrics = queue.getMetrics();
|
QueueMetrics metrics = queue.getMetrics();
|
||||||
if (pending) {
|
if (pending) {
|
||||||
|
@ -279,6 +287,7 @@ public class AppSchedulingInfo {
|
||||||
+ " resource=" + request.getCapability());
|
+ " resource=" + request.getCapability());
|
||||||
}
|
}
|
||||||
metrics.allocateResources(user, 1, request.getCapability(), true);
|
metrics.allocateResources(user, 1, request.getCapability(), true);
|
||||||
|
return resourceRequests;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -288,9 +297,9 @@ public class AppSchedulingInfo {
|
||||||
* @param allocatedContainers
|
* @param allocatedContainers
|
||||||
* resources allocated to the application
|
* resources allocated to the application
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateNodeLocal(
|
synchronized private void allocateNodeLocal(SchedulerNode node,
|
||||||
SchedulerNode node, Priority priority,
|
Priority priority, ResourceRequest nodeLocalRequest, Container container,
|
||||||
ResourceRequest nodeLocalRequest, Container container) {
|
List<ResourceRequest> resourceRequests) {
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
|
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
|
||||||
if (nodeLocalRequest.getNumContainers() == 0) {
|
if (nodeLocalRequest.getNumContainers() == 0) {
|
||||||
|
@ -304,7 +313,14 @@ public class AppSchedulingInfo {
|
||||||
this.requests.get(priority).remove(node.getRackName());
|
this.requests.get(priority).remove(node.getRackName());
|
||||||
}
|
}
|
||||||
|
|
||||||
decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
|
ResourceRequest offRackRequest = requests.get(priority).get(
|
||||||
|
ResourceRequest.ANY);
|
||||||
|
decrementOutstanding(offRackRequest);
|
||||||
|
|
||||||
|
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -314,16 +330,22 @@ public class AppSchedulingInfo {
|
||||||
* @param allocatedContainers
|
* @param allocatedContainers
|
||||||
* resources allocated to the application
|
* resources allocated to the application
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateRackLocal(
|
synchronized private void allocateRackLocal(SchedulerNode node,
|
||||||
SchedulerNode node, Priority priority,
|
Priority priority, ResourceRequest rackLocalRequest, Container container,
|
||||||
ResourceRequest rackLocalRequest, Container container) {
|
List<ResourceRequest> resourceRequests) {
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
|
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
|
||||||
if (rackLocalRequest.getNumContainers() == 0) {
|
if (rackLocalRequest.getNumContainers() == 0) {
|
||||||
this.requests.get(priority).remove(node.getRackName());
|
this.requests.get(priority).remove(node.getRackName());
|
||||||
}
|
}
|
||||||
|
|
||||||
decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
|
ResourceRequest offRackRequest = requests.get(priority).get(
|
||||||
|
ResourceRequest.ANY);
|
||||||
|
decrementOutstanding(offRackRequest);
|
||||||
|
|
||||||
|
// Update cloned RackLocal and OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
||||||
|
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -333,11 +355,13 @@ public class AppSchedulingInfo {
|
||||||
* @param allocatedContainers
|
* @param allocatedContainers
|
||||||
* resources allocated to the application
|
* resources allocated to the application
|
||||||
*/
|
*/
|
||||||
synchronized private void allocateOffSwitch(
|
synchronized private void allocateOffSwitch(SchedulerNode node,
|
||||||
SchedulerNode node, Priority priority,
|
Priority priority, ResourceRequest offSwitchRequest, Container container,
|
||||||
ResourceRequest offSwitchRequest, Container container) {
|
List<ResourceRequest> resourceRequests) {
|
||||||
// Update future requirements
|
// Update future requirements
|
||||||
decrementOutstanding(offSwitchRequest);
|
decrementOutstanding(offSwitchRequest);
|
||||||
|
// Update cloned RackLocal and OffRack requests for recovery
|
||||||
|
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized private void decrementOutstanding(
|
synchronized private void decrementOutstanding(
|
||||||
|
@ -436,4 +460,11 @@ public class AppSchedulingInfo {
|
||||||
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
|
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
|
||||||
false);
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
|
||||||
|
ResourceRequest newRequest = ResourceRequest.newInstance(
|
||||||
|
request.getPriority(), request.getResourceName(),
|
||||||
|
request.getCapability(), 1, request.getRelaxLocality());
|
||||||
|
return newRequest;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt {
|
||||||
public synchronized void updateResourceRequests(
|
public synchronized void updateResourceRequests(
|
||||||
List<ResourceRequest> requests) {
|
List<ResourceRequest> requests) {
|
||||||
if (!isStopped) {
|
if (!isStopped) {
|
||||||
appSchedulingInfo.updateResourceRequests(requests);
|
appSchedulingInfo.updateResourceRequests(requests, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void recoverResourceRequests(
|
||||||
|
List<ResourceRequest> requests) {
|
||||||
|
if (!isStopped) {
|
||||||
|
appSchedulingInfo.updateResourceRequests(requests, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1089,6 +1089,7 @@ public class CapacityScheduler extends
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
||||||
}
|
}
|
||||||
|
recoverResourceRequestForContainer(cont);
|
||||||
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
||||||
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
||||||
RMContainerEventType.KILL);
|
RMContainerEventType.KILL);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
if (null == liveContainers.remove(rmContainer.getContainerId())) {
|
if (null == liveContainers.remove(rmContainer.getContainerId())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove from the list of newly allocated containers if found
|
||||||
|
newlyAllocatedContainers.remove(rmContainer);
|
||||||
|
|
||||||
Container container = rmContainer.getContainer();
|
Container container = rmContainer.getContainer();
|
||||||
ContainerId containerId = container.getId();
|
ContainerId containerId = container.getId();
|
||||||
|
@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
liveContainers.put(container.getId(), rmContainer);
|
liveContainers.put(container.getId(), rmContainer);
|
||||||
|
|
||||||
// Update consumption and track allocations
|
// Update consumption and track allocations
|
||||||
appSchedulingInfo.allocate(type, node, priority, request, container);
|
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
|
||||||
|
type, node, priority, request, container);
|
||||||
Resources.addTo(currentConsumption, container.getResource());
|
Resources.addTo(currentConsumption, container.getResource());
|
||||||
|
|
||||||
|
// Update resource requests related to "request" and store in RMContainer
|
||||||
|
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
|
||||||
|
|
||||||
// Inform the container
|
// Inform the container
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -82,6 +83,9 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
Container container = rmContainer.getContainer();
|
Container container = rmContainer.getContainer();
|
||||||
ContainerId containerId = container.getId();
|
ContainerId containerId = container.getId();
|
||||||
|
|
||||||
|
// Remove from the list of newly allocated containers if found
|
||||||
|
newlyAllocatedContainers.remove(rmContainer);
|
||||||
|
|
||||||
// Inform the container
|
// Inform the container
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerFinishedEvent(
|
new RMContainerFinishedEvent(
|
||||||
|
@ -281,9 +285,13 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
liveContainers.put(container.getId(), rmContainer);
|
liveContainers.put(container.getId(), rmContainer);
|
||||||
|
|
||||||
// Update consumption and track allocations
|
// Update consumption and track allocations
|
||||||
appSchedulingInfo.allocate(type, node, priority, request, container);
|
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
|
||||||
|
type, node, priority, request, container);
|
||||||
Resources.addTo(currentConsumption, container.getResource());
|
Resources.addTo(currentConsumption, container.getResource());
|
||||||
|
|
||||||
|
// Update resource requests related to "request" and store in RMContainer
|
||||||
|
((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
|
||||||
|
|
||||||
// Inform the container
|
// Inform the container
|
||||||
rmContainer.handle(
|
rmContainer.handle(
|
||||||
new RMContainerEvent(container.getId(), RMContainerEventType.START));
|
new RMContainerEvent(container.getId(), RMContainerEventType.START));
|
||||||
|
|
|
@ -422,7 +422,7 @@ public class FairScheduler extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void warnOrKillContainer(RMContainer container) {
|
protected void warnOrKillContainer(RMContainer container) {
|
||||||
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
|
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
|
||||||
FSSchedulerApp app = getSchedulerApp(appAttemptId);
|
FSSchedulerApp app = getSchedulerApp(appAttemptId);
|
||||||
FSLeafQueue queue = app.getQueue();
|
FSLeafQueue queue = app.getQueue();
|
||||||
|
@ -440,6 +440,7 @@ public class FairScheduler extends
|
||||||
SchedulerUtils.createPreemptedContainerStatus(
|
SchedulerUtils.createPreemptedContainerStatus(
|
||||||
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
||||||
|
|
||||||
|
recoverResourceRequestForContainer(container);
|
||||||
// TODO: Not sure if this ever actually adds this to the list of cleanup
|
// TODO: Not sure if this ever actually adds this to the list of cleanup
|
||||||
// containers on the RMNode (see SchedulerNode.releaseContainer()).
|
// containers on the RMNode (see SchedulerNode.releaseContainer()).
|
||||||
completedContainer(container, status, RMContainerEventType.KILL);
|
completedContainer(container, status, RMContainerEventType.KILL);
|
||||||
|
|
|
@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.ArgumentCaptor;
|
import org.mockito.ArgumentCaptor;
|
||||||
|
|
||||||
|
@ -204,4 +214,36 @@ public class TestRMContainerImpl {
|
||||||
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
|
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
|
||||||
verify(writer, never()).containerFinished(any(RMContainer.class));
|
verify(writer, never()).containerFinished(any(RMContainer.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExistenceOfResourceRequestInRMContainer() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
|
||||||
|
RMApp app1 = rm1.submitApp(1024);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
ResourceScheduler scheduler = rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId2 = ContainerId.newInstance(
|
||||||
|
am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
|
||||||
|
// Verify whether list of ResourceRequest is present in RMContainer
|
||||||
|
// while moving to ALLOCATED state
|
||||||
|
Assert.assertNotNull(scheduler.getRMContainer(containerId2)
|
||||||
|
.getResourceRequests());
|
||||||
|
|
||||||
|
// Allocate container
|
||||||
|
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>())
|
||||||
|
.getAllocatedContainers();
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
|
||||||
|
|
||||||
|
// After RMContainer moving to ACQUIRED state, list of ResourceRequest will
|
||||||
|
// be empty
|
||||||
|
Assert.assertNull(scheduler.getRMContainer(containerId2)
|
||||||
|
.getResourceRequests());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -79,6 +80,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
@ -87,6 +90,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
|
@ -947,4 +951,67 @@ public class TestCapacityScheduler {
|
||||||
|
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testRecoverRequestAfterPreemption() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
|
||||||
|
RMApp app1 = rm1.submitApp(1024);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
||||||
|
|
||||||
|
// request a container.
|
||||||
|
am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
|
||||||
|
ContainerId containerId1 = ContainerId.newInstance(
|
||||||
|
am1.getApplicationAttemptId(), 2);
|
||||||
|
rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
|
||||||
|
|
||||||
|
RMContainer rmContainer = cs.getRMContainer(containerId1);
|
||||||
|
List<ResourceRequest> requests = rmContainer.getResourceRequests();
|
||||||
|
FiCaSchedulerApp app = cs.getApplicationAttempt(am1
|
||||||
|
.getApplicationAttemptId());
|
||||||
|
|
||||||
|
FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
|
||||||
|
for (ResourceRequest request : requests) {
|
||||||
|
// Skip the OffRack and RackLocal resource requests.
|
||||||
|
if (request.getResourceName().equals(node.getRackName())
|
||||||
|
|| request.getResourceName().equals(ResourceRequest.ANY)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Already the node local resource request is cleared from RM after
|
||||||
|
// allocation.
|
||||||
|
Assert.assertNull(app.getResourceRequest(request.getPriority(),
|
||||||
|
request.getResourceName()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call killContainer to preempt the container
|
||||||
|
cs.killContainer(rmContainer);
|
||||||
|
|
||||||
|
Assert.assertEquals(3, requests.size());
|
||||||
|
for (ResourceRequest request : requests) {
|
||||||
|
// Resource request must have added back in RM after preempt event
|
||||||
|
// handling.
|
||||||
|
Assert.assertEquals(
|
||||||
|
1,
|
||||||
|
app.getResourceRequest(request.getPriority(),
|
||||||
|
request.getResourceName()).getNumContainers());
|
||||||
|
}
|
||||||
|
|
||||||
|
// New container will be allocated and will move to ALLOCATED state
|
||||||
|
ContainerId containerId2 = ContainerId.newInstance(
|
||||||
|
am1.getApplicationAttemptId(), 3);
|
||||||
|
rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
|
||||||
|
|
||||||
|
// allocate container
|
||||||
|
List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>()).getAllocatedContainers();
|
||||||
|
|
||||||
|
// Now with updated ResourceRequest, a container is allocated for AM.
|
||||||
|
Assert.assertTrue(containers.size() == 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -167,6 +167,27 @@ public class FairSchedulerTestBase {
|
||||||
.put(id.getApplicationId(), rmApp);
|
.put(id.getApplicationId(), rmApp);
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected ApplicationAttemptId createSchedulingRequest(String queueId,
|
||||||
|
String userId, List<ResourceRequest> ask) {
|
||||||
|
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
|
||||||
|
this.ATTEMPT_ID++);
|
||||||
|
scheduler.addApplication(id.getApplicationId(), queueId, userId);
|
||||||
|
// This conditional is for testAclSubmitApplication where app is rejected
|
||||||
|
// and no app is added.
|
||||||
|
if (scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
|
||||||
|
scheduler.addApplicationAttempt(id, false, true);
|
||||||
|
}
|
||||||
|
scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
|
||||||
|
RMApp rmApp = mock(RMApp.class);
|
||||||
|
RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
||||||
|
when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
||||||
|
when(rmAppAttempt.getRMAppAttemptMetrics()).thenReturn(
|
||||||
|
new RMAppAttemptMetrics(id));
|
||||||
|
resourceManager.getRMContext().getRMApps()
|
||||||
|
.put(id.getApplicationId(), rmApp);
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
protected void createSchedulingRequestExistingApplication(
|
protected void createSchedulingRequestExistingApplication(
|
||||||
int memory, int priority, ApplicationAttemptId attId) {
|
int memory, int priority, ApplicationAttemptId attId) {
|
||||||
|
|
|
@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
|
@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
||||||
|
@ -2831,6 +2836,87 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=5000)
|
||||||
|
public void testRecoverRequestAfterPreemption() throws Exception {
|
||||||
|
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
|
||||||
|
|
||||||
|
MockClock clock = new MockClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
Priority priority = Priority.newInstance(20);
|
||||||
|
String host = "127.0.0.1";
|
||||||
|
int GB = 1024;
|
||||||
|
|
||||||
|
// Create Node and raised Node Added event
|
||||||
|
RMNode node = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(16 * 1024, 4), 0, host);
|
||||||
|
NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
|
||||||
|
scheduler.handle(nodeEvent);
|
||||||
|
|
||||||
|
// Create 3 container requests and place it in ask
|
||||||
|
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
|
||||||
|
ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
|
||||||
|
priority.getPriority(), 1, true);
|
||||||
|
ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
|
||||||
|
node.getRackName(), priority.getPriority(), 1, true);
|
||||||
|
ResourceRequest offRackRequest = createResourceRequest(GB, 1,
|
||||||
|
ResourceRequest.ANY, priority.getPriority(), 1, true);
|
||||||
|
ask.add(nodeLocalRequest);
|
||||||
|
ask.add(rackLocalRequest);
|
||||||
|
ask.add(offRackRequest);
|
||||||
|
|
||||||
|
// Create Request and update
|
||||||
|
ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
|
||||||
|
"user1", ask);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// Sufficient node check-ins to fully schedule containers
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
||||||
|
scheduler.handle(nodeUpdate);
|
||||||
|
|
||||||
|
assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
|
||||||
|
.size());
|
||||||
|
FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
|
||||||
|
|
||||||
|
// ResourceRequest will be empty once NodeUpdate is completed
|
||||||
|
Assert.assertNull(app.getResourceRequest(priority, host));
|
||||||
|
|
||||||
|
ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
|
||||||
|
RMContainer rmContainer = app.getRMContainer(containerId1);
|
||||||
|
|
||||||
|
// Create a preempt event and register for preemption
|
||||||
|
scheduler.warnOrKillContainer(rmContainer);
|
||||||
|
|
||||||
|
// Wait for few clock ticks
|
||||||
|
clock.tick(5);
|
||||||
|
|
||||||
|
// preempt now
|
||||||
|
scheduler.warnOrKillContainer(rmContainer);
|
||||||
|
|
||||||
|
List<ResourceRequest> requests = rmContainer.getResourceRequests();
|
||||||
|
// Once recovered, resource request will be present again in app
|
||||||
|
Assert.assertEquals(3, requests.size());
|
||||||
|
for (ResourceRequest request : requests) {
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
app.getResourceRequest(priority, request.getResourceName())
|
||||||
|
.getNumContainers());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send node heartbeat
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(nodeUpdate);
|
||||||
|
|
||||||
|
List<Container> containers = scheduler.allocate(appAttemptId,
|
||||||
|
Collections.<ResourceRequest> emptyList(),
|
||||||
|
Collections.<ContainerId> emptyList(), null, null).getContainers();
|
||||||
|
|
||||||
|
// Now with updated ResourceRequest, a container is allocated for AM.
|
||||||
|
Assert.assertTrue(containers.size() == 1);
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("resource")
|
@SuppressWarnings("resource")
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue