YARN-5292. NM Container lifecycle and state transitions to support for PAUSED container state. (Hitesh Sharma via asuresh)
(cherry picked from commit 864fbacd45
)
This commit is contained in:
parent
2223393ad1
commit
dd51a74601
|
@ -33,11 +33,14 @@ public enum ContainerState {
|
|||
|
||||
/** Running container */
|
||||
RUNNING,
|
||||
|
||||
|
||||
/** Completed container */
|
||||
COMPLETE,
|
||||
|
||||
/** Scheduled (awaiting resources) at the NM. */
|
||||
@InterfaceStability.Unstable
|
||||
SCHEDULED
|
||||
SCHEDULED,
|
||||
|
||||
/** Paused at the NM. */
|
||||
PAUSED
|
||||
}
|
|
@ -83,6 +83,7 @@ enum ContainerStateProto {
|
|||
C_RUNNING = 2;
|
||||
C_COMPLETE = 3;
|
||||
C_SCHEDULED = 4;
|
||||
C_PAUSED = 5;
|
||||
}
|
||||
|
||||
message ContainerProto {
|
||||
|
|
|
@ -699,6 +699,28 @@ public abstract class ContainerExecutor implements Configurable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause the container. The default implementation is to raise a kill event.
|
||||
* Specific executor implementations can override this behavior.
|
||||
* @param container
|
||||
* the Container
|
||||
*/
|
||||
public void pauseContainer(Container container) {
|
||||
LOG.warn(container.getContainerId() + " doesn't support pausing.");
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume the container from pause state. The default implementation ignores
|
||||
* this event. Specific implementations can override this behavior.
|
||||
* @param container
|
||||
* the Container
|
||||
*/
|
||||
public void resumeContainer(Container container) {
|
||||
LOG.warn(container.getContainerId() + " doesn't support resume.");
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the process-identifier for the container.
|
||||
*
|
||||
|
|
|
@ -27,6 +27,8 @@ public enum ContainerEventType {
|
|||
CONTAINER_DONE,
|
||||
REINITIALIZE_CONTAINER,
|
||||
ROLLBACK_REINIT,
|
||||
PAUSE_CONTAINER,
|
||||
RESUME_CONTAINER,
|
||||
|
||||
// DownloadManager
|
||||
CONTAINER_INITED,
|
||||
|
@ -38,5 +40,7 @@ public enum ContainerEventType {
|
|||
CONTAINER_LAUNCHED,
|
||||
CONTAINER_EXITED_WITH_SUCCESS,
|
||||
CONTAINER_EXITED_WITH_FAILURE,
|
||||
CONTAINER_KILLED_ON_REQUEST
|
||||
CONTAINER_KILLED_ON_REQUEST,
|
||||
CONTAINER_PAUSED,
|
||||
CONTAINER_RESUMED
|
||||
}
|
||||
|
|
|
@ -305,6 +305,8 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.NEW, ContainerState.DONE,
|
||||
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
|
||||
.addTransition(ContainerState.NEW, ContainerState.DONE,
|
||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
||||
|
||||
// From LOCALIZING State
|
||||
.addTransition(ContainerState.LOCALIZING,
|
||||
|
@ -320,6 +322,8 @@ public class ContainerImpl implements Container {
|
|||
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER,
|
||||
new KillBeforeRunningTransition())
|
||||
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
|
||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
||||
|
||||
// From LOCALIZATION_FAILED State
|
||||
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
||||
|
@ -333,7 +337,8 @@ public class ContainerImpl implements Container {
|
|||
// container not launched so kill is a no-op
|
||||
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
||||
ContainerState.LOCALIZATION_FAILED,
|
||||
ContainerEventType.KILL_CONTAINER)
|
||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
// container cleanup triggers a release of all resources
|
||||
// regardless of whether they were localized or not
|
||||
// LocalizedResource handles release event in all states
|
||||
|
@ -389,6 +394,76 @@ public class ContainerImpl implements Container {
|
|||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
new KilledExternallyTransition())
|
||||
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
|
||||
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
|
||||
|
||||
// From PAUSING State
|
||||
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.PAUSING, ContainerState.PAUSED,
|
||||
ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
|
||||
// In case something goes wrong then container will exit from the
|
||||
// PAUSING state
|
||||
.addTransition(ContainerState.PAUSING,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS)
|
||||
.addTransition(ContainerState.PAUSING,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
new ExitedWithFailureTransition(true))
|
||||
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
new KilledExternallyTransition())
|
||||
|
||||
// From PAUSED State
|
||||
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
||||
ContainerEventType.PAUSE_CONTAINER)
|
||||
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
|
||||
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
|
||||
// In case something goes wrong then container will exit from the
|
||||
// PAUSED state
|
||||
.addTransition(ContainerState.PAUSED,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
new ExitedWithFailureTransition(true))
|
||||
.addTransition(ContainerState.PAUSED, ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
new KilledExternallyTransition())
|
||||
.addTransition(ContainerState.PAUSED,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||
new ExitedWithSuccessTransition(true))
|
||||
|
||||
// From RESUMING State
|
||||
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||
.addTransition(ContainerState.RESUMING, ContainerState.RUNNING,
|
||||
ContainerEventType.CONTAINER_RESUMED)
|
||||
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
|
||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
// In case something goes wrong then container will exit from the
|
||||
// RESUMING state
|
||||
.addTransition(ContainerState.RESUMING,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
new ExitedWithFailureTransition(true))
|
||||
.addTransition(ContainerState.RESUMING,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
new KilledExternallyTransition())
|
||||
.addTransition(ContainerState.RESUMING,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||
new ExitedWithSuccessTransition(true))
|
||||
|
||||
// From REINITIALIZING State
|
||||
.addTransition(ContainerState.REINITIALIZING,
|
||||
|
@ -412,6 +487,8 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
|
||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
||||
.addTransition(ContainerState.REINITIALIZING,
|
||||
ContainerState.SCHEDULED,
|
||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||
|
@ -429,6 +506,8 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
|
||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
|
||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
||||
|
||||
// From CONTAINER_EXITED_WITH_SUCCESS State
|
||||
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
|
||||
|
@ -440,7 +519,8 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerState.EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.KILL_CONTAINER)
|
||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
|
||||
// From EXITED_WITH_FAILURE State
|
||||
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
||||
|
@ -452,7 +532,8 @@ public class ContainerImpl implements Container {
|
|||
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerState.EXITED_WITH_FAILURE,
|
||||
ContainerEventType.KILL_CONTAINER)
|
||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
|
||||
// From KILLING State.
|
||||
.addTransition(ContainerState.KILLING,
|
||||
|
@ -486,7 +567,8 @@ public class ContainerImpl implements Container {
|
|||
// in the container launcher
|
||||
.addTransition(ContainerState.KILLING,
|
||||
ContainerState.KILLING,
|
||||
ContainerEventType.CONTAINER_LAUNCHED)
|
||||
EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
|
||||
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
|
||||
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||
|
@ -502,11 +584,13 @@ public class ContainerImpl implements Container {
|
|||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||
ContainerEventType.RESOURCE_FAILED,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
|
||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
|
||||
// From DONE
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
ContainerEventType.KILL_CONTAINER)
|
||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||
ContainerEventType.PAUSE_CONTAINER))
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
ContainerEventType.INIT_CONTAINER)
|
||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||
|
@ -532,6 +616,8 @@ public class ContainerImpl implements Container {
|
|||
case LOCALIZING:
|
||||
case LOCALIZATION_FAILED:
|
||||
case SCHEDULED:
|
||||
case PAUSED:
|
||||
case RESUMING:
|
||||
return org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED;
|
||||
case RUNNING:
|
||||
case RELAUNCHING:
|
||||
|
@ -541,6 +627,7 @@ public class ContainerImpl implements Container {
|
|||
case KILLING:
|
||||
case CONTAINER_CLEANEDUP_AFTER_KILL:
|
||||
case CONTAINER_RESOURCES_CLEANINGUP:
|
||||
case PAUSING:
|
||||
return org.apache.hadoop.yarn.api.records.ContainerState.RUNNING;
|
||||
case DONE:
|
||||
default:
|
||||
|
@ -1498,6 +1585,26 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions upon receiving PAUSE_CONTAINER.
|
||||
* - LOCALIZED -> KILLING.
|
||||
* - REINITIALIZING -> KILLING.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class KillOnPauseTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
// Kill the process/process-grp
|
||||
container.setIsReInitializing(false);
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainersLauncherEvent(container,
|
||||
ContainersLauncherEventType.CLEANUP_CONTAINER));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transition from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL
|
||||
* upon receiving CONTAINER_KILLED_ON_REQUEST.
|
||||
|
@ -1688,6 +1795,57 @@ public class ContainerImpl implements Container {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions upon receiving PAUSE_CONTAINER.
|
||||
* - RUNNING -> PAUSED
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class PauseContainerTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
// Pause the process/process-grp if it is supported by the container
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainersLauncherEvent(container,
|
||||
ContainersLauncherEventType.PAUSE_CONTAINER));
|
||||
ContainerPauseEvent pauseEvent = (ContainerPauseEvent) event;
|
||||
container.addDiagnostics(pauseEvent.getDiagnostic(), "\n");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions upon receiving PAUSED_CONTAINER.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class PausedContainerTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
// Container was PAUSED so tell the scheduler
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainerSchedulerEvent(container,
|
||||
ContainerSchedulerEventType.CONTAINER_PAUSED));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Transitions upon receiving RESUME_CONTAINER.
|
||||
* - PAUSED -> RUNNING
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
static class ResumeContainerTransition implements
|
||||
SingleArcTransition<ContainerImpl, ContainerEvent> {
|
||||
@Override
|
||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
||||
// Pause the process/process-grp if it is supported by the container
|
||||
container.dispatcher.getEventHandler().handle(
|
||||
new ContainersLauncherEvent(container,
|
||||
ContainersLauncherEventType.RESUME_CONTAINER));
|
||||
ContainerResumeEvent resumeEvent = (ContainerResumeEvent) event;
|
||||
container.addDiagnostics(resumeEvent.getDiagnostic(), "\n");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ContainerEvent event) {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
||||
/**
|
||||
* ContainerEvent for ContainerEventType.PAUSE_CONTAINER.
|
||||
*/
|
||||
public class ContainerPauseEvent extends ContainerEvent {
|
||||
|
||||
private final String diagnostic;
|
||||
|
||||
public ContainerPauseEvent(ContainerId cId,
|
||||
String diagnostic) {
|
||||
super(cId, ContainerEventType.PAUSE_CONTAINER);
|
||||
this.diagnostic = diagnostic;
|
||||
}
|
||||
|
||||
public String getDiagnostic() {
|
||||
return this.diagnostic;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
||||
/**
|
||||
* ContainerEvent for ContainerEventType.RESUME_CONTAINER.
|
||||
*/
|
||||
public class ContainerResumeEvent extends ContainerEvent {
|
||||
|
||||
private final String diagnostic;
|
||||
|
||||
public ContainerResumeEvent(ContainerId cId,
|
||||
String diagnostic) {
|
||||
super(cId, ContainerEventType.RESUME_CONTAINER);
|
||||
this.diagnostic = diagnostic;
|
||||
}
|
||||
|
||||
public String getDiagnostic() {
|
||||
return this.diagnostic;
|
||||
}
|
||||
}
|
|
@ -21,5 +21,6 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
|||
public enum ContainerState {
|
||||
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
|
||||
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
|
||||
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE
|
||||
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
|
||||
PAUSING, PAUSED, RESUMING
|
||||
}
|
||||
|
|
|
@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
@ -86,6 +87,7 @@ import org.apache.hadoop.yarn.util.Apps;
|
|||
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
public class ContainerLaunch implements Callable<Integer> {
|
||||
|
||||
|
@ -106,8 +108,10 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
private final Configuration conf;
|
||||
private final Context context;
|
||||
private final ContainerManagerImpl containerManager;
|
||||
|
||||
|
||||
protected AtomicBoolean containerAlreadyLaunched = new AtomicBoolean(false);
|
||||
protected AtomicBoolean shouldPauseContainer = new AtomicBoolean(false);
|
||||
|
||||
protected AtomicBoolean completed = new AtomicBoolean(false);
|
||||
|
||||
private volatile boolean killedBeforeStart = false;
|
||||
|
@ -802,6 +806,90 @@ public class ContainerLaunch implements Callable<Integer> {
|
|||
return signal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Pause the container.
|
||||
* Cancels the launch if the container isn't launched yet. Otherwise asks the
|
||||
* executor to pause the container.
|
||||
* @throws IOException in case of errors.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
public void pauseContainer() throws IOException {
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String containerIdStr = containerId.toString();
|
||||
LOG.info("Pausing the container " + containerIdStr);
|
||||
|
||||
// The pause event is only handled if the container is in the running state
|
||||
// (the container state machine), so we don't check for
|
||||
// shouldLaunchContainer over here
|
||||
|
||||
if (!shouldPauseContainer.compareAndSet(false, true)) {
|
||||
LOG.info("Container " + containerId + " not paused as "
|
||||
+ "resume already called");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Pause the container
|
||||
exec.pauseContainer(container);
|
||||
|
||||
// PauseContainer is a blocking call. We are here almost means the
|
||||
// container is paused, so send out the event.
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_PAUSED));
|
||||
} catch (Exception e) {
|
||||
String message =
|
||||
"Exception when trying to pause container " + containerIdStr
|
||||
+ ": " + StringUtils.stringifyException(e);
|
||||
LOG.info(message);
|
||||
container.handle(new ContainerKillEvent(container.getContainerId(),
|
||||
ContainerExitStatus.PREEMPTED, "Container preempted as there was "
|
||||
+ " an exception in pausing it."));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume the container.
|
||||
* Cancels the launch if the container isn't launched yet. Otherwise asks the
|
||||
* executor to pause the container.
|
||||
* @throws IOException in case of error.
|
||||
*/
|
||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||
public void resumeContainer() throws IOException {
|
||||
ContainerId containerId = container.getContainerId();
|
||||
String containerIdStr = containerId.toString();
|
||||
LOG.info("Resuming the container " + containerIdStr);
|
||||
|
||||
// The resume event is only handled if the container is in a paused state
|
||||
// so we don't check for the launched flag here.
|
||||
|
||||
// paused flag will be set to true if process already paused
|
||||
boolean alreadyPaused = !shouldPauseContainer.compareAndSet(false, true);
|
||||
if (!alreadyPaused) {
|
||||
LOG.info("Container " + containerIdStr + " not paused."
|
||||
+ " No resume necessary");
|
||||
return;
|
||||
}
|
||||
|
||||
// If the container has already started
|
||||
try {
|
||||
exec.resumeContainer(container);
|
||||
// ResumeContainer is a blocking call. We are here almost means the
|
||||
// container is resumed, so send out the event.
|
||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||
containerId,
|
||||
ContainerEventType.CONTAINER_RESUMED));
|
||||
} catch (Exception e) {
|
||||
String message =
|
||||
"Exception when trying to resume container " + containerIdStr
|
||||
+ ": " + StringUtils.stringifyException(e);
|
||||
LOG.info(message);
|
||||
container.handle(new ContainerKillEvent(container.getContainerId(),
|
||||
ContainerExitStatus.PREEMPTED, "Container preempted as there was "
|
||||
+ " an exception in pausing it."));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loop through for a time-bounded interval waiting to
|
||||
* read the process id from a file generated by a running process.
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -171,6 +173,36 @@ public class ContainersLauncher extends AbstractService
|
|||
+ " with command " + signalEvent.getCommand());
|
||||
}
|
||||
break;
|
||||
case PAUSE_CONTAINER:
|
||||
ContainerLaunch launchedContainer = running.get(containerId);
|
||||
if (launchedContainer == null) {
|
||||
// Container not launched. So nothing needs to be done.
|
||||
return;
|
||||
}
|
||||
|
||||
// Pause the container
|
||||
try {
|
||||
launchedContainer.pauseContainer();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception while pausing container: " +
|
||||
StringUtils.stringifyException(e));
|
||||
}
|
||||
break;
|
||||
case RESUME_CONTAINER:
|
||||
ContainerLaunch launchCont = running.get(containerId);
|
||||
if (launchCont == null) {
|
||||
// Container not launched. So nothing needs to be done.
|
||||
return;
|
||||
}
|
||||
|
||||
// Resume the container.
|
||||
try {
|
||||
launchCont.resumeContainer();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Got exception while resuming container: " +
|
||||
StringUtils.stringifyException(e));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,4 +25,7 @@ public enum ContainersLauncherEventType {
|
|||
CLEANUP_CONTAINER, // The process(grp) itself.
|
||||
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
|
||||
SIGNAL_CONTAINER,
|
||||
PAUSE_CONTAINER,
|
||||
RESUME_CONTAINER
|
||||
|
||||
}
|
||||
|
|
|
@ -27,4 +27,5 @@ public enum ContainerSchedulerEventType {
|
|||
UPDATE_CONTAINER,
|
||||
// Producer: Node HB response - RM has asked to shed the queue
|
||||
SHED_QUEUED_CONTAINERS,
|
||||
CONTAINER_PAUSED
|
||||
}
|
||||
|
|
|
@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestContainer {
|
||||
|
||||
|
@ -207,6 +208,42 @@ public class TestContainer {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked") // mocked generic
|
||||
public void testContainerPauseAndResume() throws Exception {
|
||||
WrappedContainer wc = null;
|
||||
try {
|
||||
wc = new WrappedContainer(13, 314159265358979L, 4344, "yak");
|
||||
wc.initContainer();
|
||||
wc.localizeResources();
|
||||
int running = metrics.getRunningContainers();
|
||||
wc.launchContainer();
|
||||
assertEquals(running + 1, metrics.getRunningContainers());
|
||||
reset(wc.localizerBus);
|
||||
wc.pauseContainer();
|
||||
assertEquals(ContainerState.PAUSED,
|
||||
wc.c.getContainerState());
|
||||
wc.resumeContainer();
|
||||
assertEquals(ContainerState.RUNNING,
|
||||
wc.c.getContainerState());
|
||||
wc.containerKilledOnRequest();
|
||||
assertEquals(ContainerState.EXITED_WITH_FAILURE,
|
||||
wc.c.getContainerState());
|
||||
assertNull(wc.c.getLocalizedResources());
|
||||
verifyCleanupCall(wc);
|
||||
int failed = metrics.getFailedContainers();
|
||||
wc.containerResourcesCleanup();
|
||||
assertEquals(ContainerState.DONE, wc.c.getContainerState());
|
||||
assertEquals(failed + 1, metrics.getFailedContainers());
|
||||
assertEquals(running, metrics.getRunningContainers());
|
||||
}
|
||||
finally {
|
||||
if (wc != null) {
|
||||
wc.finished();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked") // mocked generic
|
||||
public void testCleanupOnFailure() throws Exception {
|
||||
|
@ -988,6 +1025,8 @@ public class TestContainer {
|
|||
NodeStatusUpdater nodeStatusUpdater = mock(NodeStatusUpdater.class);
|
||||
when(context.getNodeStatusUpdater()).thenReturn(nodeStatusUpdater);
|
||||
ContainerExecutor executor = mock(ContainerExecutor.class);
|
||||
Mockito.doNothing().when(executor).pauseContainer(any(Container.class));
|
||||
Mockito.doNothing().when(executor).resumeContainer(any(Container.class));
|
||||
launcher =
|
||||
new ContainersLauncher(context, dispatcher, executor, null, null);
|
||||
// create a mock ExecutorService, which will not really launch
|
||||
|
@ -1196,6 +1235,18 @@ public class TestContainer {
|
|||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
public void pauseContainer() {
|
||||
c.handle(new ContainerPauseEvent(cId,
|
||||
"PauseRequest"));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
public void resumeContainer() {
|
||||
c.handle(new ContainerResumeEvent(cId,
|
||||
"ResumeRequest"));
|
||||
drainDispatcherEvents();
|
||||
}
|
||||
|
||||
public void containerKilledOnRequest() {
|
||||
int exitCode = ContainerExitStatus.KILLED_BY_RESOURCEMANAGER;
|
||||
String diagnosticMsg = "Container completed with exit code " + exitCode;
|
||||
|
|
Loading…
Reference in New Issue