YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)

This commit is contained in:
Arun Suresh 2016-12-09 07:51:03 -08:00
parent b9465bb876
commit 864fbacd45
13 changed files with 454 additions and 11 deletions

View File

@ -33,11 +33,14 @@ public enum ContainerState {
/** Running container */ /** Running container */
RUNNING, RUNNING,
/** Completed container */ /** Completed container */
COMPLETE, COMPLETE,
/** Scheduled (awaiting resources) at the NM. */ /** Scheduled (awaiting resources) at the NM. */
@InterfaceStability.Unstable @InterfaceStability.Unstable
SCHEDULED SCHEDULED,
/** Paused at the NM. */
PAUSED
} }

View File

@ -110,6 +110,7 @@ enum ContainerStateProto {
C_RUNNING = 2; C_RUNNING = 2;
C_COMPLETE = 3; C_COMPLETE = 3;
C_SCHEDULED = 4; C_SCHEDULED = 4;
C_PAUSED = 5;
} }
message ContainerProto { message ContainerProto {

View File

@ -699,6 +699,28 @@ public abstract class ContainerExecutor implements Configurable {
} }
} }
/**
* Pause the container. The default implementation is to raise a kill event.
* Specific executor implementations can override this behavior.
* @param container
* the Container
*/
public void pauseContainer(Container container) {
LOG.warn(container.getContainerId() + " doesn't support pausing.");
throw new UnsupportedOperationException();
}
/**
* Resume the container from pause state. The default implementation ignores
* this event. Specific implementations can override this behavior.
* @param container
* the Container
*/
public void resumeContainer(Container container) {
LOG.warn(container.getContainerId() + " doesn't support resume.");
throw new UnsupportedOperationException();
}
/** /**
* Get the process-identifier for the container. * Get the process-identifier for the container.
* *

View File

@ -27,6 +27,8 @@ public enum ContainerEventType {
CONTAINER_DONE, CONTAINER_DONE,
REINITIALIZE_CONTAINER, REINITIALIZE_CONTAINER,
ROLLBACK_REINIT, ROLLBACK_REINIT,
PAUSE_CONTAINER,
RESUME_CONTAINER,
// DownloadManager // DownloadManager
CONTAINER_INITED, CONTAINER_INITED,
@ -38,5 +40,7 @@ public enum ContainerEventType {
CONTAINER_LAUNCHED, CONTAINER_LAUNCHED,
CONTAINER_EXITED_WITH_SUCCESS, CONTAINER_EXITED_WITH_SUCCESS,
CONTAINER_EXITED_WITH_FAILURE, CONTAINER_EXITED_WITH_FAILURE,
CONTAINER_KILLED_ON_REQUEST CONTAINER_KILLED_ON_REQUEST,
CONTAINER_PAUSED,
CONTAINER_RESUMED
} }

View File

@ -307,6 +307,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.NEW, ContainerState.DONE, .addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition()) ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
.addTransition(ContainerState.NEW, ContainerState.DONE,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZING State // From LOCALIZING State
.addTransition(ContainerState.LOCALIZING, .addTransition(ContainerState.LOCALIZING,
@ -322,6 +324,8 @@ public class ContainerImpl implements Container {
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING, .addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, ContainerEventType.KILL_CONTAINER,
new KillBeforeRunningTransition()) new KillBeforeRunningTransition())
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From LOCALIZATION_FAILED State // From LOCALIZATION_FAILED State
.addTransition(ContainerState.LOCALIZATION_FAILED, .addTransition(ContainerState.LOCALIZATION_FAILED,
@ -335,7 +339,8 @@ public class ContainerImpl implements Container {
// container not launched so kill is a no-op // container not launched so kill is a no-op
.addTransition(ContainerState.LOCALIZATION_FAILED, .addTransition(ContainerState.LOCALIZATION_FAILED,
ContainerState.LOCALIZATION_FAILED, ContainerState.LOCALIZATION_FAILED,
ContainerEventType.KILL_CONTAINER) EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
// container cleanup triggers a release of all resources // container cleanup triggers a release of all resources
// regardless of whether they were localized or not // regardless of whether they were localized or not
// LocalizedResource handles release event in all states // LocalizedResource handles release event in all states
@ -391,6 +396,76 @@ public class ContainerImpl implements Container {
ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition()) new KilledExternallyTransition())
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
// From PAUSING State
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
// In case something goes wrong then container will exit from the
// PAUSING state
.addTransition(ContainerState.PAUSING,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
.addTransition(ContainerState.PAUSING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
// From PAUSED State
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
ContainerEventType.PAUSE_CONTAINER)
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
// In case something goes wrong then container will exit from the
// PAUSED state
.addTransition(ContainerState.PAUSED,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
.addTransition(ContainerState.PAUSED,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
// From RESUMING State
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
ContainerEventType.CONTAINER_RESUMED)
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
UPDATE_DIAGNOSTICS_TRANSITION)
// In case something goes wrong then container will exit from the
// RESUMING state
.addTransition(ContainerState.RESUMING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true))
.addTransition(ContainerState.RESUMING,
ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
new KilledExternallyTransition())
.addTransition(ContainerState.RESUMING,
ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
new ExitedWithSuccessTransition(true))
// From REINITIALIZING State // From REINITIALIZING State
.addTransition(ContainerState.REINITIALIZING, .addTransition(ContainerState.REINITIALIZING,
@ -414,6 +489,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING, .addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
.addTransition(ContainerState.REINITIALIZING, .addTransition(ContainerState.REINITIALIZING,
ContainerState.SCHEDULED, ContainerState.SCHEDULED,
ContainerEventType.CONTAINER_KILLED_ON_REQUEST, ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
@ -431,6 +508,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING, .addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.KILL_CONTAINER, new KillTransition()) ContainerEventType.KILL_CONTAINER, new KillTransition())
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
// From CONTAINER_EXITED_WITH_SUCCESS State // From CONTAINER_EXITED_WITH_SUCCESS State
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
@ -442,7 +521,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_SUCCESS, .addTransition(ContainerState.EXITED_WITH_SUCCESS,
ContainerState.EXITED_WITH_SUCCESS, ContainerState.EXITED_WITH_SUCCESS,
ContainerEventType.KILL_CONTAINER) EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
// From EXITED_WITH_FAILURE State // From EXITED_WITH_FAILURE State
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE, .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
@ -454,7 +534,8 @@ public class ContainerImpl implements Container {
UPDATE_DIAGNOSTICS_TRANSITION) UPDATE_DIAGNOSTICS_TRANSITION)
.addTransition(ContainerState.EXITED_WITH_FAILURE, .addTransition(ContainerState.EXITED_WITH_FAILURE,
ContainerState.EXITED_WITH_FAILURE, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.KILL_CONTAINER) EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
// From KILLING State. // From KILLING State.
.addTransition(ContainerState.KILLING, .addTransition(ContainerState.KILLING,
@ -488,7 +569,8 @@ public class ContainerImpl implements Container {
// in the container launcher // in the container launcher
.addTransition(ContainerState.KILLING, .addTransition(ContainerState.KILLING,
ContainerState.KILLING, ContainerState.KILLING,
ContainerEventType.CONTAINER_LAUNCHED) EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
ContainerEventType.PAUSE_CONTAINER))
// From CONTAINER_CLEANEDUP_AFTER_KILL State. // From CONTAINER_CLEANEDUP_AFTER_KILL State.
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL, .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
@ -504,11 +586,13 @@ public class ContainerImpl implements Container {
EnumSet.of(ContainerEventType.KILL_CONTAINER, EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.RESOURCE_FAILED, ContainerEventType.RESOURCE_FAILED,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE)) ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
ContainerEventType.PAUSE_CONTAINER))
// From DONE // From DONE
.addTransition(ContainerState.DONE, ContainerState.DONE, .addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.KILL_CONTAINER) EnumSet.of(ContainerEventType.KILL_CONTAINER,
ContainerEventType.PAUSE_CONTAINER))
.addTransition(ContainerState.DONE, ContainerState.DONE, .addTransition(ContainerState.DONE, ContainerState.DONE,
ContainerEventType.INIT_CONTAINER) ContainerEventType.INIT_CONTAINER)
.addTransition(ContainerState.DONE, ContainerState.DONE, .addTransition(ContainerState.DONE, ContainerState.DONE,
@ -534,6 +618,8 @@ public class ContainerImpl implements Container {
case LOCALIZING: case LOCALIZING:
case LOCALIZATION_FAILED: case LOCALIZATION_FAILED:
case SCHEDULED: case SCHEDULED:
case PAUSED:
case RESUMING:
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED; return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
case RUNNING: case RUNNING:
case RELAUNCHING: case RELAUNCHING:
@ -543,6 +629,7 @@ public class ContainerImpl implements Container {
case KILLING: case KILLING:
case CONTAINER_CLEANEDUP_AFTER_KILL: case CONTAINER_CLEANEDUP_AFTER_KILL:
case CONTAINER_RESOURCES_CLEANINGUP: case CONTAINER_RESOURCES_CLEANINGUP:
case PAUSING:
return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING; return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
case DONE: case DONE:
default: default:
@ -1500,6 +1587,26 @@ public class ContainerImpl implements Container {
} }
} }
/**
* Transitions upon receiving PAUSE_CONTAINER.
* - LOCALIZED -> KILLING.
* - REINITIALIZING -> KILLING.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class KillOnPauseTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Kill the process/process-grp
container.setIsReInitializing(false);
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.CLEANUP_CONTAINER));
}
}
/** /**
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL * Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
* upon receiving CONTAINER_KILLED_ON_REQUEST. * upon receiving CONTAINER_KILLED_ON_REQUEST.
@ -1690,6 +1797,57 @@ public class ContainerImpl implements Container {
} }
} }
/**
* Transitions upon receiving PAUSE_CONTAINER.
* - RUNNING -> PAUSED
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class PauseContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Pause the process/process-grp if it is supported by the container
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.PAUSE_CONTAINER));
ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
}
}
/**
* Transitions upon receiving PAUSED_CONTAINER.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class PausedContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Container was PAUSED so tell the scheduler
container.dispatcher.getEventHandler().handle(
new ContainerSchedulerEvent(container,
ContainerSchedulerEventType.CONTAINER_PAUSED));
}
}
/**
* Transitions upon receiving RESUME_CONTAINER.
* - PAUSED -> RUNNING
*/
@SuppressWarnings("unchecked") // dispatcher not typed
static class ResumeContainerTransition implements
SingleArcTransition<ContainerImpl, ContainerEvent> {
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
// Pause the process/process-grp if it is supported by the container
container.dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(container,
ContainersLauncherEventType.RESUME_CONTAINER));
ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
}
}
@Override @Override
public void handle(ContainerEvent event) { public void handle(ContainerEvent event) {
try { try {

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
*/
public class ContainerPauseEvent extends ContainerEvent {
private final String diagnostic;
public ContainerPauseEvent(ContainerId cId,
String diagnostic) {
super(cId, ContainerEventType.PAUSE_CONTAINER);
this.diagnostic = diagnostic;
}
public String getDiagnostic() {
return this.diagnostic;
}
}

View File

@ -0,0 +1,39 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* ContainerEvent for ContainerEventType.RESUME_CONTAINER.
*/
public class ContainerResumeEvent extends ContainerEvent {
private final String diagnostic;
public ContainerResumeEvent(ContainerId cId,
String diagnostic) {
super(cId, ContainerEventType.RESUME_CONTAINER);
this.diagnostic = diagnostic;
}
public String getDiagnostic() {
return this.diagnostic;
}
}

View File

@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
public enum ContainerState { public enum ContainerState {
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING, NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING, REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
PAUSING, PAUSED, RESUMING
} }

View File

@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper; import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class ContainerLaunch implements Callable<Integer> { public class ContainerLaunch implements Callable<Integer> {
@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable<Integer> {
private final Configuration conf; private final Configuration conf;
private final Context context; private final Context context;
private final ContainerManagerImpl containerManager; private final ContainerManagerImpl containerManager;
protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false); protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
protected AtomicBoolean completed = new AtomicBoolean(false); protected AtomicBoolean completed = new AtomicBoolean(false);
private volatile boolean killedBeforeStart = false; private volatile boolean killedBeforeStart = false;
@ -802,6 +806,90 @@ public class ContainerLaunch implements Callable<Integer> {
return signal; return signal;
} }
/**
* Pause the container.
* Cancels the launch if the container isn't launched yet. Otherwise asks the
* executor to pause the container.
* @throws IOException in case of errors.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
public void pauseContainer() throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
LOG.info("Pausing the container " + containerIdStr);
// The pause event is only handled if the container is in the running state
// (the container state machine), so we don't check for
// shouldLaunchContainer over here
if (!shouldPauseContainer.compareAndSet(false, true)) {
LOG.info("Container " + containerId + " not paused as "
+ "resume already called");
return;
}
try {
// Pause the container
exec.pauseContainer(container);
// PauseContainer is a blocking call. We are here almost means the
// container is paused, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
containerId,
ContainerEventType.CONTAINER_PAUSED));
} catch (Exception e) {
String message =
"Exception when trying to pause container " + containerIdStr
+ ": " + StringUtils.stringifyException(e);
LOG.info(message);
container.handle(new ContainerKillEvent(container.getContainerId(),
ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ " an exception in pausing it."));
}
}
/**
* Resume the container.
* Cancels the launch if the container isn't launched yet. Otherwise asks the
* executor to pause the container.
* @throws IOException in case of error.
*/
@SuppressWarnings("unchecked") // dispatcher not typed
public void resumeContainer() throws IOException {
ContainerId containerId = container.getContainerId();
String containerIdStr = containerId.toString();
LOG.info("Resuming the container " + containerIdStr);
// The resume event is only handled if the container is in a paused state
// so we don't check for the launched flag here.
// paused flag will be set to true if process already paused
boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
if (!alreadyPaused) {
LOG.info("Container " + containerIdStr + " not paused."
+ " No resume necessary");
return;
}
// If the container has already started
try {
exec.resumeContainer(container);
// ResumeContainer is a blocking call. We are here almost means the
// container is resumed, so send out the event.
dispatcher.getEventHandler().handle(new ContainerEvent(
containerId,
ContainerEventType.CONTAINER_RESUMED));
} catch (Exception e) {
String message =
"Exception when trying to resume container " + containerIdStr
+ ": " + StringUtils.stringifyException(e);
LOG.info(message);
container.handle(new ContainerKillEvent(container.getContainerId(),
ContainerExitStatus.PREEMPTED, "Container preempted as there was "
+ " an exception in pausing it."));
}
}
/** /**
* Loop through for a time-bounded interval waiting to * Loop through for a time-bounded interval waiting to
* read the process id from a file generated by a running process. * read the process id from a file generated by a running process.

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -171,6 +173,36 @@ public class ContainersLauncher extends AbstractService
+ " with command " + signalEvent.getCommand()); + " with command " + signalEvent.getCommand());
} }
break; break;
case PAUSE_CONTAINER:
ContainerLaunch launchedContainer = running.get(containerId);
if (launchedContainer == null) {
// Container not launched. So nothing needs to be done.
return;
}
// Pause the container
try {
launchedContainer.pauseContainer();
} catch (Exception e) {
LOG.info("Got exception while pausing container: " +
StringUtils.stringifyException(e));
}
break;
case RESUME_CONTAINER:
ContainerLaunch launchCont = running.get(containerId);
if (launchCont == null) {
// Container not launched. So nothing needs to be done.
return;
}
// Resume the container.
try {
launchCont.resumeContainer();
} catch (Exception e) {
LOG.info("Got exception while resuming container: " +
StringUtils.stringifyException(e));
}
break;
} }
} }
} }

View File

@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
CLEANUP_CONTAINER, // The process(grp) itself. CLEANUP_CONTAINER, // The process(grp) itself.
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself. CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
SIGNAL_CONTAINER, SIGNAL_CONTAINER,
PAUSE_CONTAINER,
RESUME_CONTAINER
} }

View File

@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType {
UPDATE_CONTAINER, UPDATE_CONTAINER,
// Producer: Node HB response - RM has asked to shed the queue // Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS, SHED_QUEUED_CONTAINERS,
CONTAINER_PAUSED
} }

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentMatcher; import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
public class TestContainer { public class TestContainer {
@ -205,6 +206,42 @@ public class TestContainer {
} }
} }
@Test
@SuppressWarnings("unchecked") // mocked generic
public void testContainerPauseAndResume() throws Exception {
WrappedContainer wc = null;
try {
wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
wc.initContainer();
wc.localizeResources();
int running = metrics.getRunningContainers();
wc.launchContainer();
assertEquals(running + 1, metrics.getRunningContainers());
reset(wc.localizerBus);
wc.pauseContainer();
assertEquals(ContainerState.PAUSED,
wc.c.getContainerState());
wc.resumeContainer();
assertEquals(ContainerState.RUNNING,
wc.c.getContainerState());
wc.containerKilledOnRequest();
assertEquals(ContainerState.EXITED_WITH_FAILURE,
wc.c.getContainerState());
assertNull(wc.c.getLocalizedResources());
verifyCleanupCall(wc);
int failed = metrics.getFailedContainers();
wc.containerResourcesCleanup();
assertEquals(ContainerState.DONE, wc.c.getContainerState());
assertEquals(failed + 1, metrics.getFailedContainers());
assertEquals(running, metrics.getRunningContainers());
}
finally {
if (wc != null) {
wc.finished();
}
}
}
@Test @Test
@SuppressWarnings("unchecked") // mocked generic @SuppressWarnings("unchecked") // mocked generic
public void testCleanupOnFailure() throws Exception { public void testCleanupOnFailure() throws Exception {
@ -955,6 +992,8 @@ public class TestContainer {
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class); NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater); when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
ContainerExecutor executor = mock(ContainerExecutor.class); ContainerExecutor executor = mock(ContainerExecutor.class);
Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
launcher = launcher =
new ContainersLauncher(context, dispatcher, executor, null, null); new ContainersLauncher(context, dispatcher, executor, null, null);
// create a mock ExecutorService, which will not really launch // create a mock ExecutorService, which will not really launch
@ -1143,6 +1182,18 @@ public class TestContainer {
drainDispatcherEvents(); drainDispatcherEvents();
} }
public void pauseContainer() {
c.handle(new ContainerPauseEvent(cId,
"PauseRequest"));
drainDispatcherEvents();
}
public void resumeContainer() {
c.handle(new ContainerResumeEvent(cId,
"ResumeRequest"));
drainDispatcherEvents();
}
public void containerKilledOnRequest() { public void containerKilledOnRequest() {
int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER; int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
String diagnosticMsg = "Container completed with exit code " + exitCode; String diagnosticMsg = "Container completed with exit code " + exitCode;