YARN-5216. Expose configurable preemption policy for OPPORTUNISTIC containers running on the NM. (Hitesh Sharma via asuresh)
(cherry picked from commit 4f8194430f
)
This commit is contained in:
parent
1af9f962c9
commit
52bf458616
|
@ -977,6 +977,15 @@ public class YarnConfiguration extends Configuration {
|
|||
NM_PREFIX + "container-retry-minimum-interval-ms";
|
||||
public static final int DEFAULT_NM_CONTAINER_RETRY_MINIMUM_INTERVAL_MS = 1000;
|
||||
|
||||
/**
|
||||
* Use container pause as the preemption policy over kill in the container
|
||||
* queue at a NodeManager.
|
||||
**/
|
||||
public static final String NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION =
|
||||
NM_PREFIX + "opportunistic-containers-use-pause-for-preemption";
|
||||
public static final boolean
|
||||
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION = false;
|
||||
|
||||
/** Interval at which the delayed token removal thread runs */
|
||||
public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
|
||||
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
|
||||
|
|
|
@ -2836,6 +2836,15 @@
|
|||
<value>100</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Use container pause as the preemption policy over kill in the container
|
||||
queue at a NodeManager.
|
||||
</description>
|
||||
<name>yarn.nodemanager.opportunistic-containers-use-pause-for-preemption</name>
|
||||
<value>false</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>
|
||||
Error filename pattern, to identify the file in the container's
|
||||
|
|
|
@ -91,4 +91,6 @@ public interface Container extends EventHandler<ContainerEvent> {
|
|||
void sendKillEvent(int exitStatus, String description);
|
||||
|
||||
boolean isRecovering();
|
||||
|
||||
void sendPauseEvent(String description);
|
||||
}
|
||||
|
|
|
@ -793,15 +793,22 @@ public class ContainerImpl implements Container {
|
|||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
@Override
|
||||
public void sendLaunchEvent() {
|
||||
ContainersLauncherEventType launcherEvent =
|
||||
ContainersLauncherEventType.LAUNCH_CONTAINER;
|
||||
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
|
||||
// try to recover a container that was previously launched
|
||||
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
|
||||
if (ContainerState.PAUSED == getContainerState()) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerResumeEvent(containerId,
|
||||
"Container Resumed as some resources freed up"));
|
||||
} else {
|
||||
ContainersLauncherEventType launcherEvent =
|
||||
ContainersLauncherEventType.LAUNCH_CONTAINER;
|
||||
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
|
||||
// try to recover a container that was previously launched
|
||||
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
|
||||
}
|
||||
containerLaunchStartTime = clock.getTime();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainersLauncherEvent(this, launcherEvent));
|
||||
}
|
||||
containerLaunchStartTime = clock.getTime();
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainersLauncherEvent(this, launcherEvent));
|
||||
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
|
@ -820,6 +827,13 @@ public class ContainerImpl implements Container {
|
|||
new ContainerKillEvent(containerId, exitStatus, description));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
@Override
|
||||
public void sendPauseEvent(String description) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ContainerPauseEvent(containerId, description));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
private void sendRelaunchEvent() {
|
||||
ContainersLauncherEventType launcherEvent =
|
||||
|
@ -1775,7 +1789,7 @@ public class ContainerImpl implements Container {
|
|||
|
||||
/**
|
||||
* Transitions upon receiving PAUSE_CONTAINER.
|
||||
* - RUNNING -> PAUSED
|
||||
* - RUNNING -> PAUSING
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class PauseContainerTransition implements
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
||||
.ChangeMonitoringContainerResourceEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
|
||||
|
||||
|
@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
|
|||
queuedOpportunisticContainers = new LinkedHashMap<>();
|
||||
|
||||
// Used to keep track of containers that have been marked to be killed
|
||||
// to make room for a guaranteed container.
|
||||
// or paused to make room for a guaranteed container.
|
||||
private final Map<ContainerId, Container> oppContainersToKill =
|
||||
new HashMap<>();
|
||||
|
||||
|
@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
|
|||
private final AsyncDispatcher dispatcher;
|
||||
private final NodeManagerMetrics metrics;
|
||||
|
||||
private Boolean usePauseEventForPreemption = false;
|
||||
|
||||
/**
|
||||
* Instantiate a Container Scheduler.
|
||||
* @param context NodeManager Context.
|
||||
|
@ -112,6 +116,17 @@ public class ContainerScheduler extends AbstractService implements
|
|||
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
this.usePauseEventForPreemption =
|
||||
conf.getBoolean(
|
||||
YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
|
||||
YarnConfiguration.
|
||||
DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
|
||||
NodeManagerMetrics metrics, int qLength) {
|
||||
|
@ -136,8 +151,9 @@ public class ContainerScheduler extends AbstractService implements
|
|||
case SCHEDULE_CONTAINER:
|
||||
scheduleContainer(event.getContainer());
|
||||
break;
|
||||
case CONTAINER_PAUSED:
|
||||
case CONTAINER_COMPLETED:
|
||||
onContainerCompleted(event.getContainer());
|
||||
onResourcesReclaimed(event.getContainer());
|
||||
break;
|
||||
case UPDATE_CONTAINER:
|
||||
if (event instanceof UpdateContainerSchedulerEvent) {
|
||||
|
@ -203,9 +219,9 @@ public class ContainerScheduler extends AbstractService implements
|
|||
queuedGuaranteedContainers.put(containerId,
|
||||
updateEvent.getContainer());
|
||||
}
|
||||
//Kill opportunistic containers if any to make room for
|
||||
//Kill/pause opportunistic containers if any to make room for
|
||||
// promotion request
|
||||
killOpportunisticContainers(updateEvent.getContainer());
|
||||
reclaimOpportunisticContainerResources(updateEvent.getContainer());
|
||||
} else {
|
||||
// Demotion of queued container.. Should not happen too often
|
||||
// since you should not find too many queued guaranteed
|
||||
|
@ -243,6 +259,12 @@ public class ContainerScheduler extends AbstractService implements
|
|||
return this.runningContainers.size();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setUsePauseEventForPreemption(
|
||||
boolean usePauseEventForPreemption) {
|
||||
this.usePauseEventForPreemption = usePauseEventForPreemption;
|
||||
}
|
||||
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
this.opportunisticContainersStatus.setQueuedOpportContainers(
|
||||
getNumQueuedOpportunisticContainers());
|
||||
|
@ -257,7 +279,7 @@ public class ContainerScheduler extends AbstractService implements
|
|||
return this.opportunisticContainersStatus;
|
||||
}
|
||||
|
||||
private void onContainerCompleted(Container container) {
|
||||
private void onResourcesReclaimed(Container container) {
|
||||
oppContainersToKill.remove(container.getContainerId());
|
||||
|
||||
// This could be killed externally for eg. by the ContainerManager,
|
||||
|
@ -292,6 +314,23 @@ public class ContainerScheduler extends AbstractService implements
|
|||
// Start pending guaranteed containers, if resources available.
|
||||
boolean resourcesAvailable = startContainers(
|
||||
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
|
||||
// Resume opportunistic containers, if resource available.
|
||||
if (resourcesAvailable) {
|
||||
List<Container> pausedContainers = new ArrayList<Container>();
|
||||
Map<ContainerId, Container> containers =
|
||||
context.getContainers();
|
||||
for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
|
||||
ContainerId contId = entry.getKey();
|
||||
// Find containers that were not already started and are in paused state
|
||||
if(false == runningContainers.containsKey(contId)) {
|
||||
if(containers.get(contId).getContainerState()
|
||||
== ContainerState.PAUSED) {
|
||||
pausedContainers.add(containers.get(contId));
|
||||
}
|
||||
}
|
||||
}
|
||||
resourcesAvailable = startContainers(pausedContainers, false);
|
||||
}
|
||||
// Start opportunistic containers, if resources available.
|
||||
if (resourcesAvailable) {
|
||||
startContainers(queuedOpportunisticContainers.values(), false);
|
||||
|
@ -395,7 +434,7 @@ public class ContainerScheduler extends AbstractService implements
|
|||
// if the guaranteed container is queued, we need to preempt opportunistic
|
||||
// containers for make room for it
|
||||
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
|
||||
killOpportunisticContainers(container);
|
||||
reclaimOpportunisticContainerResources(container);
|
||||
}
|
||||
} else {
|
||||
// Given an opportunistic container, we first try to start as many queuing
|
||||
|
@ -413,19 +452,30 @@ public class ContainerScheduler extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
private void killOpportunisticContainers(Container container) {
|
||||
List<Container> extraOpportContainersToKill =
|
||||
pickOpportunisticContainersToKill(container.getContainerId());
|
||||
@SuppressWarnings("unchecked")
|
||||
private void reclaimOpportunisticContainerResources(Container container) {
|
||||
List<Container> extraOppContainersToReclaim =
|
||||
pickOpportunisticContainersToReclaimResources(
|
||||
container.getContainerId());
|
||||
// Kill the opportunistic containers that were chosen.
|
||||
for (Container contToKill : extraOpportContainersToKill) {
|
||||
contToKill.sendKillEvent(
|
||||
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
||||
"Container Killed to make room for Guaranteed Container.");
|
||||
oppContainersToKill.put(contToKill.getContainerId(), contToKill);
|
||||
for (Container contToReclaim : extraOppContainersToReclaim) {
|
||||
String preemptionAction = usePauseEventForPreemption == true ? "paused" :
|
||||
"resumed";
|
||||
LOG.info(
|
||||
"Opportunistic container {} will be killed in order to start the "
|
||||
"Container {} will be {} to start the "
|
||||
+ "execution of guaranteed container {}.",
|
||||
contToKill.getContainerId(), container.getContainerId());
|
||||
contToReclaim.getContainerId(), preemptionAction,
|
||||
container.getContainerId());
|
||||
|
||||
if (usePauseEventForPreemption) {
|
||||
contToReclaim.sendPauseEvent(
|
||||
"Container Paused to make room for Guaranteed Container");
|
||||
} else {
|
||||
contToReclaim.sendKillEvent(
|
||||
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
||||
"Container Killed to make room for Guaranteed Container.");
|
||||
}
|
||||
oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -440,7 +490,7 @@ public class ContainerScheduler extends AbstractService implements
|
|||
container.sendLaunchEvent();
|
||||
}
|
||||
|
||||
private List<Container> pickOpportunisticContainersToKill(
|
||||
private List<Container> pickOpportunisticContainersToReclaimResources(
|
||||
ContainerId containerToStartId) {
|
||||
// The opportunistic containers that need to be killed for the
|
||||
// given container to start.
|
||||
|
|
|
@ -23,6 +23,8 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -49,6 +51,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
|
@ -124,18 +127,38 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
|||
@Override
|
||||
protected ContainerExecutor createContainerExecutor() {
|
||||
DefaultContainerExecutor exec = new DefaultContainerExecutor() {
|
||||
ConcurrentMap<String, Boolean> oversleepMap =
|
||||
new ConcurrentHashMap<String, Boolean>();
|
||||
@Override
|
||||
public int launchContainer(ContainerStartContext ctx)
|
||||
throws IOException, ConfigurationException {
|
||||
oversleepMap.put(ctx.getContainer().getContainerId().toString(), false);
|
||||
if (delayContainers) {
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
if(oversleepMap.get(ctx.getContainer().getContainerId().toString())
|
||||
== true) {
|
||||
Thread.sleep(10000);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
// Nothing..
|
||||
}
|
||||
}
|
||||
return super.launchContainer(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pauseContainer(Container container) {
|
||||
// To mimic pausing we force the container to be in the PAUSED state
|
||||
// a little longer by oversleeping.
|
||||
oversleepMap.put(container.getContainerId().toString(), true);
|
||||
LOG.info("Container was paused");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resumeContainer(Container container) {
|
||||
LOG.info("Container was resumed");
|
||||
}
|
||||
};
|
||||
exec.setConf(conf);
|
||||
return spy(exec);
|
||||
|
@ -505,6 +528,86 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
|
|||
contStatus1.getState());
|
||||
}
|
||||
|
||||
/**
|
||||
* Submit two OPPORTUNISTIC and one GUARANTEED containers. The resources
|
||||
* requests by each container as such that only one can run in parallel.
|
||||
* Thus, the OPPORTUNISTIC container that started running, will be
|
||||
* paused for the GUARANTEED container to start.
|
||||
* Once the GUARANTEED container finishes its execution, the remaining
|
||||
* OPPORTUNISTIC container will be executed.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
|
||||
containerManager.start();
|
||||
containerManager.getContainerScheduler().
|
||||
setUsePauseEventForPreemption(true);
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.RUNNING, 40);
|
||||
|
||||
list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(1), ContainerState.RUNNING, 40);
|
||||
|
||||
// Get container statuses. Container 0 should be paused, container 1
|
||||
// should be running.
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest =
|
||||
GetContainerStatusesRequest.newInstance(statList);
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
if (status.getContainerId().equals(createContainerId(0))) {
|
||||
Assert.assertTrue(status.getDiagnostics().contains(
|
||||
"Container Paused to make room for Guaranteed Container"));
|
||||
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
status.getState());
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
|
||||
// Make sure that the GUARANTEED container completes
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(1), ContainerState.DONE, 40);
|
||||
// Make sure that the PAUSED opportunistic container resumes and
|
||||
// starts running
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.DONE, 40);
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. Submit a long running GUARANTEED container to hog all NM resources.
|
||||
* 2. Submit 6 OPPORTUNISTIC containers, all of which will be queued.
|
||||
|
|
|
@ -229,4 +229,9 @@ public class MockContainer implements Container {
|
|||
public boolean isRecovering() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendPauseEvent(String description) {
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue