From 704b37d0e11bd7f5b5377802ec4acb6f56692158 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Tue, 19 Jan 2016 09:30:04 +0800 Subject: [PATCH] YARN-4502. Fix two AM containers get allocated when AM restart. (Vinod Kumar Vavilapalli via wangda) (cherry picked from commit a44ce3f14fd940601f984fbf7980aa6fdc8f23b7) --- .../scheduler/ResourceSchedulerWrapper.java | 4 +- hadoop-yarn-project/CHANGES.txt | 3 + .../ProportionalCapacityPreemptionPolicy.java | 8 +- .../rmcontainer/RMContainerImpl.java | 17 +-- .../scheduler/AbstractYarnScheduler.java | 44 +++++-- .../scheduler/AppSchedulingInfo.java | 11 +- .../PreemptableResourceScheduler.java | 6 +- .../SchedulerApplicationAttempt.java | 2 +- .../scheduler/capacity/CapacityScheduler.java | 83 ++++++------- .../common/fica/FiCaSchedulerApp.java | 2 +- .../{ => event}/ContainerPreemptEvent.java | 7 +- .../event/ContainerRescheduledEvent.java | 35 ------ .../scheduler/event/SchedulerEventType.java | 11 +- .../scheduler/fair/FairScheduler.java | 34 ++---- .../scheduler/fifo/FifoScheduler.java | 28 ++--- .../yarn/server/resourcemanager/MockRM.java | 3 +- .../resourcemanager/TestRMDispatcher.java | 20 ++-- .../applicationsmanager/TestAMRestart.java | 6 +- ...tProportionalCapacityPreemptionPolicy.java | 10 +- .../scheduler/TestAbstractYarnScheduler.java | 113 ++++++++++++++++++ .../capacity/TestApplicationPriority.java | 6 +- .../capacity/TestCapacityScheduler.java | 8 +- .../scheduler/fair/TestFairScheduler.java | 16 +-- 23 files changed, 275 insertions(+), 202 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/{ => event}/ContainerPreemptEvent.java (83%) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 47ec2d1f016..8835deb7efa 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -70,10 +70,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -958,7 +958,7 @@ public class ResourceSchedulerWrapper } @Override - protected void completedContainer(RMContainer rmContainer, + protected void completedContainerInternal(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { // do nothing } diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index e2d3f95c341..20b64edddf4 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -1222,6 +1222,9 @@ Release 2.8.0 - UNRELEASED YARN-4596. SystemMetricPublisher should not swallow error messages from TimelineClient#putEntities. (Li Lu via jianhe) + YARN-4502. Fix two AM containers get allocated when AM restart. + (Vinod Kumar Vavilapalli via wangda) + Release 2.7.3 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 99f6f566624..3a87edb94a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -43,7 +43,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -51,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; @@ -257,7 +257,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic // kill it rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.KILL_CONTAINER)); + SchedulerEventType.KILL_PREEMPTED_CONTAINER)); preempted.remove(container); } else { if (preempted.get(container) != null) { @@ -268,7 +268,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic //otherwise just send preemption events rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent(appAttemptId, container, - SchedulerEventType.PREEMPT_CONTAINER)); + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION)); preempted.put(container, clock.getTime()); } } @@ -764,7 +764,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic if (!observeOnly) { rmContext.getDispatcher().getEventHandler().handle( new ContainerPreemptEvent( - appId, c, SchedulerEventType.DROP_RESERVATION)); + appId, c, SchedulerEventType.KILL_RESERVED_CONTAINER)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 96c4f2772d6..83876d0a15c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -49,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.state.InvalidStateTransitionException; import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.SingleArcTransition; @@ -97,7 +96,7 @@ public class RMContainerImpl implements RMContainer, Comparable { .addTransition(RMContainerState.ALLOCATED, RMContainerState.EXPIRED, RMContainerEventType.EXPIRE, new FinishedTransition()) .addTransition(RMContainerState.ALLOCATED, RMContainerState.KILLED, - RMContainerEventType.KILL, new ContainerRescheduledTransition()) + RMContainerEventType.KILL, new FinishedTransition()) // Transitions from ACQUIRED state .addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING, @@ -521,7 +520,8 @@ public class RMContainerImpl implements RMContainer, Comparable { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { - // Clear ResourceRequest stored in RMContainer + // Clear ResourceRequest stored in RMContainer, we don't need to remember + // this anymore. container.setResourceRequests(null); // Register with containerAllocationExpirer. @@ -597,17 +597,6 @@ public class RMContainerImpl implements RMContainer, Comparable { } } - private static final class ContainerRescheduledTransition extends - FinishedTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - // Tell scheduler to recover request of this container to app - container.eventHandler.handle(new ContainerRescheduledEvent(container)); - super.transition(container, event); - } - } - private static class FinishedTransition extends BaseTransition { @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index ed93acfa875..41a04f23da7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -511,20 +511,28 @@ public abstract class AbstractYarnScheduler * Recover resource request back from RMContainer when a container is * preempted before AM pulled the same. If container is pulled by * AM, then RMContainer will not have resource request to recover. - * @param rmContainer + * @param rmContainer rmContainer */ - protected void recoverResourceRequestForContainer(RMContainer rmContainer) { + private void recoverResourceRequestForContainer(RMContainer rmContainer) { List requests = rmContainer.getResourceRequests(); // If container state is moved to ACQUIRED, request will be empty. if (requests == null) { return; } - // Add resource request back to Scheduler. - SchedulerApplicationAttempt schedulerAttempt - = getCurrentAttemptForContainer(rmContainer.getContainerId()); + + // Add resource request back to Scheduler ApplicationAttempt. + + // We lookup the application-attempt here again using + // getCurrentApplicationAttempt() because there is only one app-attempt at + // any point in the scheduler. But in corner cases, AMs can crash, + // corresponding containers get killed and recovered to the same-attempt, + // but because the app-attempt is extinguished right after, the recovered + // requests don't serve any purpose, but that's okay. + SchedulerApplicationAttempt schedulerAttempt = + getCurrentAttemptForContainer(rmContainer.getContainerId()); if (schedulerAttempt != null) { - schedulerAttempt.recoverResourceRequests(requests); + schedulerAttempt.recoverResourceRequestsForContainer(requests); } } @@ -559,8 +567,30 @@ public abstract class AbstractYarnScheduler } } + @VisibleForTesting + @Private // clean up a completed container - protected abstract void completedContainer(RMContainer rmContainer, + public void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { + + if (rmContainer == null) { + LOG.info("Container " + containerStatus.getContainerId() + + " completed with event " + event + + ", but corresponding RMContainer doesn't exist."); + return; + } + + completedContainerInternal(rmContainer, containerStatus, event); + + // If the container is getting killed in ACQUIRED state, the requester (AM + // for regular containers and RM itself for AM container) will not know what + // happened. Simply add the ResourceRequest back again so that requester + // doesn't need to do anything conditionally. + recoverResourceRequestForContainer(rmContainer); + } + + // clean up a completed container + protected abstract void completedContainerInternal(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event); protected void releaseContainers(List containers, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 973e9d37c75..631b4182afd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -289,12 +289,15 @@ public class AppSchedulingInfo { * application, by asking for more resources and releasing resources acquired * by the application. * - * @param requests resources to be acquired - * @param recoverPreemptedRequest recover ResourceRequest on preemption + * @param requests + * resources to be acquired + * @param recoverPreemptedRequestForAContainer + * recover ResourceRequest on preemption * @return true if any resource was updated, false otherwise */ public synchronized boolean updateResourceRequests( - List requests, boolean recoverPreemptedRequest) { + List requests, + boolean recoverPreemptedRequestForAContainer) { // Flag to track if any incoming requests update "ANY" requests boolean anyResourcesUpdated = false; @@ -315,7 +318,7 @@ public class AppSchedulingInfo { // Increment number of containers if recovering preempted resources ResourceRequest lastRequest = asks.get(resourceName); - if (recoverPreemptedRequest && lastRequest != null) { + if (recoverPreemptedRequestForAContainer && lastRequest != null) { request.setNumContainers(lastRequest.getNumContainers() + 1); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java index c89696d1530..ee7e101e0b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java @@ -31,7 +31,7 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * ask the scheduler to drop the reservation for the given container. * @param container Reference to reserved container allocation. */ - void dropContainerReservation(RMContainer container); + void killReservedContainer(RMContainer container); /** * Ask the scheduler to obtain back the container from a specific application @@ -39,12 +39,12 @@ public interface PreemptableResourceScheduler extends ResourceScheduler { * @param aid the application from which we want to get a container back * @param container the container we want back */ - void preemptContainer(ApplicationAttemptId aid, RMContainer container); + void markContainerForPreemption(ApplicationAttemptId aid, RMContainer container); /** * Ask the scheduler to forcibly interrupt the container given as input * @param container */ - void killContainer(RMContainer container); + void killPreemptedContainer(RMContainer container); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index b43c1060d81..d91c79ed99b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -322,7 +322,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { return false; } - public synchronized void recoverResourceRequests( + public synchronized void recoverResourceRequestsForContainer( List requests) { if (!isStopped) { appSchedulingInfo.updateResourceRequests(requests, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 84b7d9bd06e..f4129ab9458 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; @@ -115,13 +114,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -865,7 +865,7 @@ public class CapacityScheduler extends LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - completedContainer( + super.completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -874,7 +874,7 @@ public class CapacityScheduler extends // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - completedContainer( + super.completedContainer( rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -1047,7 +1047,7 @@ public class CapacityScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); RMContainer container = getRMContainer(containerId); - completedContainer(container, completedContainer, + super.completedContainer(container, completedContainer, RMContainerEventType.FINISHED); if (container != null) { releasedContainers++; @@ -1128,7 +1128,7 @@ public class CapacityScheduler extends // Unreserve container on this node RMContainer reservedContainer = node.getReservedContainer(); if (null != reservedContainer) { - dropContainerReservation(reservedContainer); + killReservedContainer(reservedContainer); } // Update node labels after we've done this @@ -1372,42 +1372,35 @@ public class CapacityScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerId = containerExpiredEvent.getContainerId(); - completedContainer(getRMContainer(containerId), + super.completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; - case DROP_RESERVATION: + case KILL_RESERVED_CONTAINER: { - ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event; - RMContainer container = dropReservationEvent.getContainer(); - dropContainerReservation(container); + ContainerPreemptEvent killReservedContainerEvent = + (ContainerPreemptEvent) event; + RMContainer container = killReservedContainerEvent.getContainer(); + killReservedContainer(container); } break; - case PREEMPT_CONTAINER: + case MARK_CONTAINER_FOR_PREEMPTION: { ContainerPreemptEvent preemptContainerEvent = (ContainerPreemptEvent)event; ApplicationAttemptId aid = preemptContainerEvent.getAppId(); RMContainer containerToBePreempted = preemptContainerEvent.getContainer(); - preemptContainer(aid, containerToBePreempted); + markContainerForPreemption(aid, containerToBePreempted); } break; - case KILL_CONTAINER: + case KILL_PREEMPTED_CONTAINER: { ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event; RMContainer containerToBeKilled = killContainerEvent.getContainer(); - killContainer(containerToBeKilled); - } - break; - case CONTAINER_RESCHEDULED: - { - ContainerRescheduledEvent containerRescheduledEvent = - (ContainerRescheduledEvent) event; - RMContainer container = containerRescheduledEvent.getContainer(); - recoverResourceRequestForContainer(container); + killPreemptedContainer(containerToBeKilled); } break; default: @@ -1462,7 +1455,7 @@ public class CapacityScheduler extends // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - completedContainer(container, + super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1472,7 +1465,7 @@ public class CapacityScheduler extends // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - completedContainer(reservedContainer, + super.completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1488,13 +1481,9 @@ public class CapacityScheduler extends @Lock(CapacityScheduler.class) @Override - protected synchronized void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - if (rmContainer == null) { - LOG.info("Container " + containerStatus.getContainerId() + - " completed with event " + event); - return; - } + protected synchronized void completedContainerInternal( + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event) { Container container = rmContainer.getContainer(); @@ -1596,11 +1585,14 @@ public class CapacityScheduler extends } @Override - public void dropContainerReservation(RMContainer container) { + public void killReservedContainer(RMContainer container) { if(LOG.isDebugEnabled()){ - LOG.debug("DROP_RESERVATION:" + container.toString()); + LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":" + + container.toString()); } - completedContainer(container, + // To think: What happens if this is no longer a reserved container, for + // e.g if the reservation became an allocation. + super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.UNRESERVED_CONTAINER), @@ -1608,25 +1600,28 @@ public class CapacityScheduler extends } @Override - public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) { + public void markContainerForPreemption(ApplicationAttemptId aid, + RMContainer cont) { if(LOG.isDebugEnabled()){ - LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() + - " container: " + cont.toString()); + LOG.debug(SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION + + ": appAttempt:" + aid.toString() + " container: " + + cont.toString()); } FiCaSchedulerApp app = getApplicationAttempt(aid); if (app != null) { - app.addPreemptContainer(cont.getContainerId()); + app.markContainerForPreemption(cont.getContainerId()); } } @Override - public void killContainer(RMContainer cont) { + public void killPreemptedContainer(RMContainer cont) { if (LOG.isDebugEnabled()) { - LOG.debug("KILL_CONTAINER: container" + cont.toString()); + LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container" + + cont.toString()); } - completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus( - cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), - RMContainerEventType.KILL); + super.completedContainer(cont, SchedulerUtils + .createPreemptedContainerStatus(cont.getContainerId(), + SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 4b88415ad8e..730743c1854 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -301,7 +301,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return ret; } - public synchronized void addPreemptContainer(ContainerId cont) { + public synchronized void markContainerForPreemption(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java similarity index 83% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java index 7ab275820e7..df1b676f0b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerPreemptEvent.java @@ -16,15 +16,14 @@ * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; /** - * Simple event class used to communicate containers unreservations, preemption, killing + * Simple event class used to communicate kill reserved containers, mark + * containers for preemption and kill already preemption-marked containers. */ public class ContainerPreemptEvent extends SchedulerEvent { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java deleted file mode 100644 index de2ce36d9a8..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/ContainerRescheduledEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; - -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; - -public class ContainerRescheduledEvent extends SchedulerEvent { - - private RMContainer container; - - public ContainerRescheduledEvent(RMContainer container) { - super(SchedulerEventType.CONTAINER_RESCHEDULED); - this.container = container; - } - - public RMContainer getContainer() { - return container; - } -} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index 40dd66b424e..9cf09e9c6a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -38,11 +38,10 @@ public enum SchedulerEventType { // Source: ContainerAllocationExpirer CONTAINER_EXPIRED, - // Source: RMContainer - CONTAINER_RESCHEDULED, - // Source: SchedulingEditPolicy - DROP_RESERVATION, - PREEMPT_CONTAINER, - KILL_CONTAINER + KILL_RESERVED_CONTAINER, + MARK_CONTAINER_FOR_PREEMPTION, // Mark a container for preemption + // in the near future + KILL_PREEMPTED_CONTAINER // Kill a container previously marked for + // preemption } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3643ea8d31a..a36dd8c6ad2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -85,7 +85,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -498,7 +497,7 @@ public class FairScheduler extends // TODO: Not sure if this ever actually adds this to the list of cleanup // containers on the RMNode (see SchedulerNode.releaseContainer()). - completedContainer(container, status, RMContainerEventType.KILL); + super.completedContainer(container, status, RMContainerEventType.KILL); LOG.info("Killing container" + container + " (after waiting for preemption for " + (getClock().getTime() - time) + "ms)"); @@ -807,7 +806,7 @@ public class FairScheduler extends LOG.info("Skip killing " + rmContainer.getContainerId()); continue; } - completedContainer(rmContainer, + super.completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), @@ -816,7 +815,7 @@ public class FairScheduler extends // Release all reserved containers for (RMContainer rmContainer : attempt.getReservedContainers()) { - completedContainer(rmContainer, + super.completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), "Application Complete"), @@ -843,13 +842,9 @@ public class FairScheduler extends * Clean up a completed container. */ @Override - protected synchronized void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - if (rmContainer == null) { - LOG.info("Container " + containerStatus.getContainerId() - + " completed with event " + event); - return; - } + protected synchronized void completedContainerInternal( + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event) { Container container = rmContainer.getContainer(); @@ -919,7 +914,7 @@ public class FairScheduler extends // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - completedContainer(container, + super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -929,7 +924,7 @@ public class FairScheduler extends // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - completedContainer(reservedContainer, + super.completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus( reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), @@ -1057,7 +1052,7 @@ public class FairScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - completedContainer(getRMContainer(containerId), + super.completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -1302,21 +1297,12 @@ public class FairScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent)event; ContainerId containerId = containerExpiredEvent.getContainerId(); - completedContainer(getRMContainer(containerId), + super.completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); break; - case CONTAINER_RESCHEDULED: - if (!(event instanceof ContainerRescheduledEvent)) { - throw new RuntimeException("Unexpected event type: " + event); - } - ContainerRescheduledEvent containerRescheduledEvent = - (ContainerRescheduledEvent) event; - RMContainer container = containerRescheduledEvent.getContainer(); - recoverResourceRequestForContainer(container); - break; default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 8e75d1167d3..5787ba62999 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -74,10 +74,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -86,7 +86,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -468,7 +467,7 @@ public class FifoScheduler extends LOG.info("Skip killing " + container.getContainerId()); continue; } - completedContainer(container, + super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL); @@ -739,7 +738,7 @@ public class FifoScheduler extends for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); - completedContainer(getRMContainer(containerId), + super.completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } @@ -858,21 +857,13 @@ public class FifoScheduler extends ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; ContainerId containerid = containerExpiredEvent.getContainerId(); - completedContainer(getRMContainer(containerid), + super.completedContainer(getRMContainer(containerid), SchedulerUtils.createAbnormalContainerStatus( containerid, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; - case CONTAINER_RESCHEDULED: - { - ContainerRescheduledEvent containerRescheduledEvent = - (ContainerRescheduledEvent) event; - RMContainer container = containerRescheduledEvent.getContainer(); - recoverResourceRequestForContainer(container); - } - break; default: LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!"); } @@ -880,12 +871,9 @@ public class FifoScheduler extends @Lock(FifoScheduler.class) @Override - protected synchronized void completedContainer(RMContainer rmContainer, - ContainerStatus containerStatus, RMContainerEventType event) { - if (rmContainer == null) { - LOG.info("Null container completed..."); - return; - } + protected synchronized void completedContainerInternal( + RMContainer rmContainer, ContainerStatus containerStatus, + RMContainerEventType event) { // Get the application for the finished container Container container = rmContainer.getContainer(); @@ -931,7 +919,7 @@ public class FifoScheduler extends } // Kill running containers for(RMContainer container : node.getRunningContainers()) { - completedContainer(container, + super.completedContainer(container, SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.LOST_CONTAINER), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 0372cd7855a..a5d14c3b55c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -275,7 +275,8 @@ public class MockRM extends ResourceManager { nm.nodeHeartbeat(true); } container = getResourceScheduler().getRMContainer(containerId); - System.out.println("Waiting for container " + containerId + " to be allocated."); + System.out.println("Waiting for container " + containerId + " to be " + + containerState + ", container is null right now."); Thread.sleep(100); if (timeoutMillisecs <= timeoutSecs * 100) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java index db7c96ab1ed..d9306dd775f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMDispatcher.java @@ -29,8 +29,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.SchedulerEventDispatcher; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.junit.Assert; @@ -55,20 +55,22 @@ public class TestRMDispatcher { ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class); RMContainer container = mock(RMContainer.class); ContainerPreemptEvent event1 = new ContainerPreemptEvent( - appAttemptId, container, SchedulerEventType.DROP_RESERVATION); + appAttemptId, container, SchedulerEventType.KILL_RESERVED_CONTAINER); rmDispatcher.getEventHandler().handle(event1); - ContainerPreemptEvent event2 = new ContainerPreemptEvent( - appAttemptId, container, SchedulerEventType.KILL_CONTAINER); + ContainerPreemptEvent event2 = + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.KILL_PREEMPTED_CONTAINER); rmDispatcher.getEventHandler().handle(event2); - ContainerPreemptEvent event3 = new ContainerPreemptEvent( - appAttemptId, container, SchedulerEventType.PREEMPT_CONTAINER); + ContainerPreemptEvent event3 = + new ContainerPreemptEvent(appAttemptId, container, + SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION); rmDispatcher.getEventHandler().handle(event3); // Wait for events to be processed by scheduler dispatcher. Thread.sleep(1000); verify(sched, times(3)).handle(any(SchedulerEvent.class)); - verify(sched).dropContainerReservation(container); - verify(sched).preemptContainer(appAttemptId, container); - verify(sched).killContainer(container); + verify(sched).killReservedContainer(container); + verify(sched).markContainerForPreemption(appAttemptId, container); + verify(sched).killPreemptedContainer(container); } catch (InterruptedException e) { Assert.fail(); } finally { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index ca4cda6068f..5035afe4a11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -566,7 +566,7 @@ public class TestAMRestart { ContainerId amContainer = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Preempt the first attempt; - scheduler.killContainer(scheduler.getRMContainer(amContainer)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); @@ -582,7 +582,7 @@ public class TestAMRestart { // Preempt the second attempt. ContainerId amContainer2 = ContainerId.newContainerId(am2.getApplicationAttemptId(), 1); - scheduler.killContainer(scheduler.getRMContainer(amContainer2)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer2)); am2.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt2.shouldCountTowardsMaxAttemptRetry()); @@ -677,7 +677,7 @@ public class TestAMRestart { ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); // Forcibly preempt the am container; - scheduler.killContainer(scheduler.getRMContainer(amContainer)); + scheduler.killPreemptedContainer(scheduler.getRMContainer(amContainer)); am1.waitForState(RMAppAttemptState.FAILED); Assert.assertTrue(! attempt1.shouldCountTowardsMaxAttemptRetry()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 7a3ce568b1a..13f267d8b29 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -23,8 +23,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_CONTAINER; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.PREEMPT_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.KILL_PREEMPTED_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Proportion import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; @@ -77,6 +76,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; @@ -289,7 +289,7 @@ public class TestProportionalCapacityPreemptionPolicy { List events = evtCaptor.getAllValues(); for (ContainerPreemptEvent e : events.subList(20, 20)) { assertEquals(appC, e.getAppId()); - assertEquals(KILL_CONTAINER, e.getType()); + assertEquals(KILL_PREEMPTED_CONTAINER, e.getType()); } } @@ -935,7 +935,7 @@ public class TestProportionalCapacityPreemptionPolicy { private final ApplicationAttemptId appAttId; private final SchedulerEventType type; IsPreemptionRequestFor(ApplicationAttemptId appAttId) { - this(appAttId, PREEMPT_CONTAINER); + this(appAttId, MARK_CONTAINER_FOR_PREEMPTION); } IsPreemptionRequestFor(ApplicationAttemptId appAttId, SchedulerEventType type) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 7c33f78358a..fc2d9c436e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -29,6 +29,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -46,11 +47,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Assert; import org.junit.Test; @@ -493,6 +498,114 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { } } + /** + * Test to verify that ResourceRequests recovery back to the right app-attempt + * after a container gets killed at ACQUIRED state: YARN-4502. + * + * @throws Exception + */ + @Test + public void testResourceRequestRecoveryToTheRightAppAttempt() + throws Exception { + + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm = new MockRM(conf); + try { + rm.start(); + RMApp rmApp = + rm.submitApp(200, "name", "user", + new HashMap(), false, "default", -1, + null, "Test", false, true); + MockNM node = + new MockNM("127.0.0.1:1234", 10240, rm.getResourceTrackerService()); + node.registerNode(); + + MockAM am1 = MockRM.launchAndRegisterAM(rmApp, rm, node); + ApplicationAttemptId applicationAttemptOneID = + am1.getApplicationAttemptId(); + ContainerId am1ContainerID = + ContainerId.newContainerId(applicationAttemptOneID, 1); + + // allocate NUM_CONTAINERS containers + am1.allocate("127.0.0.1", 1024, 1, new ArrayList()); + node.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.size() != 1) { + node.nodeHeartbeat(true); + containers.addAll(am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers()); + Thread.sleep(200); + } + + // launch a 2nd container, for testing running-containers transfer. + node.nodeHeartbeat(applicationAttemptOneID, 2, ContainerState.RUNNING); + ContainerId runningContainerID = + ContainerId.newContainerId(applicationAttemptOneID, 2); + rm.waitForState(node, runningContainerID, RMContainerState.RUNNING); + + // 3rd container is in Allocated state. + int ALLOCATED_CONTAINER_PRIORITY = 1047; + am1.allocate("127.0.0.1", 1024, 1, ALLOCATED_CONTAINER_PRIORITY, + new ArrayList(), null); + node.nodeHeartbeat(true); + ContainerId allocatedContainerID = + ContainerId.newContainerId(applicationAttemptOneID, 3); + rm.waitForContainerAllocated(node, allocatedContainerID); + rm.waitForState(node, allocatedContainerID, RMContainerState.ALLOCATED); + RMContainer allocatedContainer = + rm.getResourceScheduler().getRMContainer(allocatedContainerID); + + // Capture scheduler app-attempt before AM crash. + SchedulerApplicationAttempt firstSchedulerAppAttempt = + ((AbstractYarnScheduler) rm + .getResourceScheduler()) + .getApplicationAttempt(applicationAttemptOneID); + + // AM crashes, and a new app-attempt gets created + node.nodeHeartbeat(applicationAttemptOneID, 1, ContainerState.COMPLETE); + rm.waitForState(node, am1ContainerID, RMContainerState.COMPLETED); + RMAppAttempt rmAppAttempt2 = MockRM.waitForAttemptScheduled(rmApp, rm); + ApplicationAttemptId applicationAttemptTwoID = + rmAppAttempt2.getAppAttemptId(); + Assert.assertEquals(2, applicationAttemptTwoID.getAttemptId()); + + // All outstanding allocated containers will be killed (irrespective of + // keep-alive of container across app-attempts) + Assert.assertEquals(RMContainerState.KILLED, + allocatedContainer.getState()); + + // The core part of this test + // The killed containers' ResourceRequests are recovered back to the + // original app-attempt, not the new one + for (ResourceRequest request : firstSchedulerAppAttempt + .getAppSchedulingInfo().getAllResourceRequests()) { + if (request.getPriority().getPriority() == 0) { + Assert.assertEquals(0, request.getNumContainers()); + } else if (request.getPriority().getPriority() == ALLOCATED_CONTAINER_PRIORITY) { + Assert.assertEquals(1, request.getNumContainers()); + } + } + + // Also, only one running container should be transferred after AM + // launches + MockRM.launchAM(rmApp, rm, node); + List transferredContainers = + rm.getResourceScheduler().getTransferredContainers( + applicationAttemptTwoID); + Assert.assertEquals(1, transferredContainers.size()); + Assert.assertEquals(runningContainerID, transferredContainers.get(0) + .getId()); + + } finally { + rm.stop(); + } + } + private void verifyMaximumResourceCapability( Resource expectedMaximumResource, YarnScheduler scheduler) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 2ad805a2066..e32a33b71a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -205,7 +205,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check node report, 12 GB used and 4 GB available @@ -513,7 +513,7 @@ public class TestApplicationPriority { if (++counter > 2) { break; } - cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } @@ -543,7 +543,7 @@ public class TestApplicationPriority { if (++counter > 1) { break; } - cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttemptApp1.getRMContainer(c.getId())); iterator.remove(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 7c95cdca1be..e139df65f31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1170,7 +1170,7 @@ public class TestCapacityScheduler { // kill the 3 containers for (Container c : allocatedContainers) { - cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1179,7 +1179,7 @@ public class TestCapacityScheduler { Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); // kill app0-attempt0 AM container - cs.killContainer(schedulerAppAttempt.getRMContainer(app0 + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(app0 .getCurrentAppAttempt().getMasterContainer().getId())); // wait for app0 failed @@ -1202,7 +1202,7 @@ public class TestCapacityScheduler { allocatedContainers = am1.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); for (Container c : allocatedContainers) { - cs.killContainer(schedulerAppAttempt.getRMContainer(c.getId())); + cs.killPreemptedContainer(schedulerAppAttempt.getRMContainer(c.getId())); } // check values @@ -1251,7 +1251,7 @@ public class TestCapacityScheduler { } // Call killContainer to preempt the container - cs.killContainer(rmContainer); + cs.killPreemptedContainer(rmContainer); Assert.assertEquals(3, requests.size()); for (ResourceRequest request : requests) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 831505b9ed4..4a32bc6bb5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -18,13 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.isA; @@ -52,6 +50,7 @@ import javax.xml.parsers.ParserConfigurationException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.yarn.MockApps; @@ -95,10 +94,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -4735,11 +4735,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { } } } - + @Test(timeout = 5000) public void testRecoverRequestAfterPreemption() throws Exception { conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10); - + ControlledClock clock = new ControlledClock(); scheduler.setClock(clock); scheduler.init(conf); @@ -4779,7 +4779,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers() .size()); - FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); + SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId); // ResourceRequest will be empty once NodeUpdate is completed Assert.assertNull(app.getResourceRequest(priority, host)); @@ -4797,7 +4797,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.warnOrKillContainer(rmContainer); // Trigger container rescheduled event - scheduler.handle(new ContainerRescheduledEvent(rmContainer)); + scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer, + SchedulerEventType.KILL_PREEMPTED_CONTAINER)); List requests = rmContainer.getResourceRequests(); // Once recovered, resource request will be present again in app @@ -4820,7 +4821,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { Assert.assertTrue(containers.size() == 1); } - @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { scheduler.init(conf);