diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 37a2b0b05c3..66d47679719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -977,6 +977,15 @@ public class YarnConfiguration extends Configuration {
NM_PREFIX + "container-retry-minimum-interval-ms";
public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
+ /**
+ * Use container pause as the preemption policy over kill in the container
+ * queue at a NodeManager.
+ **/
+ public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION =
+ NM_PREFIX + "opportunistic-containers-use-pause-for-preemption";
+ public static final boolean
+ DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false;
+
/** Interval at which the delayed token removal thread runs */
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 7cb79ada31a..71fb98626d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2836,6 +2836,15 @@
100
+
+
+ Use container pause as the preemption policy over kill in the container
+ queue at a NodeManager.
+
+ yarn.nodemanager.opportunistic-containers-use-pause-for-preemption
+ false
+
+
Error filename pattern, to identify the file in the container's
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 44d17c3a318..14aab88880b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -91,4 +91,6 @@ public interface Container extends EventHandler {
void sendKillEvent(int exitStatus, String description);
boolean isRecovering();
+
+ void sendPauseEvent(String description);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 5b24e734b4a..20b4b087940 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -793,15 +793,22 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked") // dispatcher not typed
@Override
public void sendLaunchEvent() {
- ContainersLauncherEventType launcherEvent =
- ContainersLauncherEventType.LAUNCH_CONTAINER;
- if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
- // try to recover a container that was previously launched
- launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ if (ContainerState.PAUSED == getContainerState()) {
+ dispatcher.getEventHandler().handle(
+ new ContainerResumeEvent(containerId,
+ "Container Resumed as some resources freed up"));
+ } else {
+ ContainersLauncherEventType launcherEvent =
+ ContainersLauncherEventType.LAUNCH_CONTAINER;
+ if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
+ // try to recover a container that was previously launched
+ launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
+ }
+ containerLaunchStartTime = clock.getTime();
+ dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(this, launcherEvent));
}
- containerLaunchStartTime = clock.getTime();
- dispatcher.getEventHandler().handle(
- new ContainersLauncherEvent(this, launcherEvent));
+
}
@SuppressWarnings("unchecked") // dispatcher not typed
@@ -820,6 +827,13 @@ public class ContainerImpl implements Container {
new ContainerKillEvent(containerId, exitStatus, description));
}
+ @SuppressWarnings("unchecked") // dispatcher not typed
+ @Override
+ public void sendPauseEvent(String description) {
+ dispatcher.getEventHandler().handle(
+ new ContainerPauseEvent(containerId, description));
+ }
+
@SuppressWarnings("unchecked") // dispatcher not typed
private void sendRelaunchEvent() {
ContainersLauncherEventType launcherEvent =
@@ -1775,7 +1789,7 @@ public class ContainerImpl implements Container {
/**
* Transitions upon receiving PAUSE_CONTAINER.
- * - RUNNING -> PAUSED
+ * - RUNNING -> PAUSING
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class PauseContainerTransition implements
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
index ace001aca84..5cf3575c708 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
.ChangeMonitoringContainerResourceEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
queuedOpportunisticContainers = new LinkedHashMap<>();
// Used to keep track of containers that have been marked to be killed
- // to make room for a guaranteed container.
+ // or paused to make room for a guaranteed container.
private final Map oppContainersToKill =
new HashMap<>();
@@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
private final AsyncDispatcher dispatcher;
private final NodeManagerMetrics metrics;
+ private Boolean usePauseEventForPreemption = false;
+
/**
* Instantiate a Container Scheduler.
* @param context NodeManager Context.
@@ -112,6 +116,17 @@ public class ContainerScheduler extends AbstractService implements
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
}
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ this.usePauseEventForPreemption =
+ conf.getBoolean(
+ YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
+ YarnConfiguration.
+ DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
+ }
+
@VisibleForTesting
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
NodeManagerMetrics metrics, int qLength) {
@@ -136,8 +151,9 @@ public class ContainerScheduler extends AbstractService implements
case SCHEDULE_CONTAINER:
scheduleContainer(event.getContainer());
break;
+ case CONTAINER_PAUSED:
case CONTAINER_COMPLETED:
- onContainerCompleted(event.getContainer());
+ onResourcesReclaimed(event.getContainer());
break;
case UPDATE_CONTAINER:
if (event instanceof UpdateContainerSchedulerEvent) {
@@ -203,9 +219,9 @@ public class ContainerScheduler extends AbstractService implements
queuedGuaranteedContainers.put(containerId,
updateEvent.getContainer());
}
- //Kill opportunistic containers if any to make room for
+ //Kill/pause opportunistic containers if any to make room for
// promotion request
- killOpportunisticContainers(updateEvent.getContainer());
+ reclaimOpportunisticContainerResources(updateEvent.getContainer());
} else {
// Demotion of queued container.. Should not happen too often
// since you should not find too many queued guaranteed
@@ -243,6 +259,12 @@ public class ContainerScheduler extends AbstractService implements
return this.runningContainers.size();
}
+ @VisibleForTesting
+ public void setUsePauseEventForPreemption(
+ boolean usePauseEventForPreemption) {
+ this.usePauseEventForPreemption = usePauseEventForPreemption;
+ }
+
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
this.opportunisticContainersStatus.setQueuedOpportContainers(
getNumQueuedOpportunisticContainers());
@@ -257,7 +279,7 @@ public class ContainerScheduler extends AbstractService implements
return this.opportunisticContainersStatus;
}
- private void onContainerCompleted(Container container) {
+ private void onResourcesReclaimed(Container container) {
oppContainersToKill.remove(container.getContainerId());
// This could be killed externally for eg. by the ContainerManager,
@@ -292,6 +314,23 @@ public class ContainerScheduler extends AbstractService implements
// Start pending guaranteed containers, if resources available.
boolean resourcesAvailable = startContainers(
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
+ // Resume opportunistic containers, if resource available.
+ if (resourcesAvailable) {
+ List pausedContainers = new ArrayList();
+ Map containers =
+ context.getContainers();
+ for (Map.Entryentry : containers.entrySet()) {
+ ContainerId contId = entry.getKey();
+ // Find containers that were not already started and are in paused state
+ if(false == runningContainers.containsKey(contId)) {
+ if(containers.get(contId).getContainerState()
+ == ContainerState.PAUSED) {
+ pausedContainers.add(containers.get(contId));
+ }
+ }
+ }
+ resourcesAvailable = startContainers(pausedContainers, false);
+ }
// Start opportunistic containers, if resources available.
if (resourcesAvailable) {
startContainers(queuedOpportunisticContainers.values(), false);
@@ -395,7 +434,7 @@ public class ContainerScheduler extends AbstractService implements
// if the guaranteed container is queued, we need to preempt opportunistic
// containers for make room for it
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
- killOpportunisticContainers(container);
+ reclaimOpportunisticContainerResources(container);
}
} else {
// Given an opportunistic container, we first try to start as many queuing
@@ -413,19 +452,30 @@ public class ContainerScheduler extends AbstractService implements
}
}
- private void killOpportunisticContainers(Container container) {
- List extraOpportContainersToKill =
- pickOpportunisticContainersToKill(container.getContainerId());
+ @SuppressWarnings("unchecked")
+ private void reclaimOpportunisticContainerResources(Container container) {
+ List extraOppContainersToReclaim =
+ pickOpportunisticContainersToReclaimResources(
+ container.getContainerId());
// Kill the opportunistic containers that were chosen.
- for (Container contToKill : extraOpportContainersToKill) {
- contToKill.sendKillEvent(
- ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
- "Container Killed to make room for Guaranteed Container.");
- oppContainersToKill.put(contToKill.getContainerId(), contToKill);
+ for (Container contToReclaim : extraOppContainersToReclaim) {
+ String preemptionAction = usePauseEventForPreemption == true ? "paused" :
+ "resumed";
LOG.info(
- "Opportunistic container {} will be killed in order to start the "
+ "Container {} will be {} to start the "
+ "execution of guaranteed container {}.",
- contToKill.getContainerId(), container.getContainerId());
+ contToReclaim.getContainerId(), preemptionAction,
+ container.getContainerId());
+
+ if (usePauseEventForPreemption) {
+ contToReclaim.sendPauseEvent(
+ "Container Paused to make room for Guaranteed Container");
+ } else {
+ contToReclaim.sendKillEvent(
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
+ "Container Killed to make room for Guaranteed Container.");
+ }
+ oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
}
}
@@ -440,7 +490,7 @@ public class ContainerScheduler extends AbstractService implements
container.sendLaunchEvent();
}
- private List pickOpportunisticContainersToKill(
+ private List pickOpportunisticContainersToReclaimResources(
ContainerId containerToStartId) {
// The opportunistic containers that need to be killed for the
// given container to start.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
index 96765683871..f3fc724bad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
@@ -23,6 +23,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
@@ -124,18 +127,38 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
@Override
protected ContainerExecutor createContainerExecutor() {
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+ ConcurrentMap oversleepMap =
+ new ConcurrentHashMap();
@Override
public int launchContainer(ContainerStartContext ctx)
throws IOException, ConfigurationException {
+ oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
if (delayContainers) {
try {
Thread.sleep(10000);
+ if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
+ == true) {
+ Thread.sleep(10000);
+ }
} catch (InterruptedException e) {
// Nothing..
}
}
return super.launchContainer(ctx);
}
+
+ @Override
+ public void pauseContainer(Container container) {
+ // To mimic pausing we force the container to be in the PAUSED state
+ // a little longer by oversleeping.
+ oversleepMap.put(container.getContainerId().toString(), true);
+ LOG.info("Container was paused");
+ }
+
+ @Override
+ public void resumeContainer(Container container) {
+ LOG.info("Container was resumed");
+ }
};
exec.setConf(conf);
return spy(exec);
@@ -505,6 +528,86 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
contStatus1.getState());
}
+ /**
+ * Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
+ * requests by each container as such that only one can run in parallel.
+ * Thus, the OPPORTUNISTIC container that started running, will be
+ * paused for the GUARANTEED container to start.
+ * Once the GUARANTEED container finishes its execution, the remaining
+ * OPPORTUNISTIC container will be executed.
+ * @throws Exception
+ */
+ @Test
+ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
+ containerManager.start();
+ containerManager.getContainerScheduler().
+ setUsePauseEventForPreemption(true);
+ ContainerLaunchContext containerLaunchContext =
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+ List list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.OPPORTUNISTIC)));
+
+ StartContainersRequest allRequests =
+ StartContainersRequest.newInstance(list);
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.RUNNING, 40);
+
+ list = new ArrayList<>();
+ list.add(StartContainerRequest.newInstance(
+ containerLaunchContext,
+ createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+ context.getNodeId(),
+ user, BuilderUtils.newResource(2048, 1),
+ context.getContainerTokenSecretManager(), null,
+ ExecutionType.GUARANTEED)));
+ allRequests =
+ StartContainersRequest.newInstance(list);
+
+ containerManager.startContainers(allRequests);
+
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(1), ContainerState.RUNNING, 40);
+
+ // Get container statuses. Container 0 should be paused, container 1
+ // should be running.
+ List statList = new ArrayList();
+ for (int i = 0; i < 2; i++) {
+ statList.add(createContainerId(i));
+ }
+ GetContainerStatusesRequest statRequest =
+ GetContainerStatusesRequest.newInstance(statList);
+ List containerStatuses = containerManager
+ .getContainerStatuses(statRequest).getContainerStatuses();
+ for (ContainerStatus status : containerStatuses) {
+ if (status.getContainerId().equals(createContainerId(0))) {
+ Assert.assertTrue(status.getDiagnostics().contains(
+ "Container Paused to make room for Guaranteed Container"));
+ } else if (status.getContainerId().equals(createContainerId(1))) {
+ Assert.assertEquals(
+ org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+ status.getState());
+ }
+ System.out.println("\nStatus : [" + status + "]\n");
+ }
+
+ // Make sure that the GUARANTEED container completes
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(1), ContainerState.DONE, 40);
+ // Make sure that the PAUSED opportunistic container resumes and
+ // starts running
+ BaseContainerManagerTest.waitForNMContainerState(containerManager,
+ createContainerId(0), ContainerState.DONE, 40);
+ }
+
/**
* 1. Submit a long running GUARANTEED container to hog all NM resources.
* 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
index 13c43c5a4bc..07df56ceb7a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
@@ -229,4 +229,9 @@ public class MockContainer implements Container {
public boolean isRecovering() {
return false;
}
+
+ @Override
+ public void sendPauseEvent(String description) {
+
+ }
}