YARN-6059. Update paused container state in the NM state store. (Hitesh Sharma via asuresh)
(cherry picked from commit 66ca0a6540
)
This commit is contained in:
parent
52bf458616
commit
b93a23f411
|
@ -813,10 +813,18 @@ public class ContainerImpl implements Container {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||||
private void sendScheduleEvent() {
|
private void sendScheduleEvent() {
|
||||||
dispatcher.getEventHandler().handle(
|
if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
|
||||||
new ContainerSchedulerEvent(this,
|
// Recovery is not supported for paused container so we raise the
|
||||||
ContainerSchedulerEventType.SCHEDULE_CONTAINER)
|
// launch event which will proceed to kill the paused container instead
|
||||||
);
|
// of raising the schedule event.
|
||||||
|
ContainersLauncherEventType launcherEvent;
|
||||||
|
launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
|
||||||
|
dispatcher.getEventHandler()
|
||||||
|
.handle(new ContainersLauncherEvent(this, launcherEvent));
|
||||||
|
} else {
|
||||||
|
dispatcher.getEventHandler().handle(new ContainerSchedulerEvent(this,
|
||||||
|
ContainerSchedulerEventType.SCHEDULE_CONTAINER));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked") // dispatcher not typed
|
@SuppressWarnings("unchecked") // dispatcher not typed
|
||||||
|
|
|
@ -813,6 +813,14 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||||
containerId,
|
containerId,
|
||||||
ContainerEventType.CONTAINER_PAUSED));
|
ContainerEventType.CONTAINER_PAUSED));
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.context.getNMStateStore().storeContainerPaused(
|
||||||
|
container.getContainerId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not store container [" + container.getContainerId()
|
||||||
|
+ "] state. The Container has been paused.", e);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String message =
|
String message =
|
||||||
"Exception when trying to pause container " + containerIdStr
|
"Exception when trying to pause container " + containerIdStr
|
||||||
|
@ -855,6 +863,14 @@ public class ContainerLaunch implements Callable<Integer> {
|
||||||
dispatcher.getEventHandler().handle(new ContainerEvent(
|
dispatcher.getEventHandler().handle(new ContainerEvent(
|
||||||
containerId,
|
containerId,
|
||||||
ContainerEventType.CONTAINER_RESUMED));
|
ContainerEventType.CONTAINER_RESUMED));
|
||||||
|
|
||||||
|
try {
|
||||||
|
this.context.getNMStateStore().removeContainerPaused(
|
||||||
|
container.getContainerId());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not store container [" + container.getContainerId()
|
||||||
|
+ "] state. The Container has been resumed.", e);
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String message =
|
String message =
|
||||||
"Exception when trying to resume container " + containerIdStr
|
"Exception when trying to resume container " + containerIdStr
|
||||||
|
|
|
@ -139,6 +139,16 @@ public class ContainersLauncher extends AbstractService
|
||||||
containerLauncher.submit(launch);
|
containerLauncher.submit(launch);
|
||||||
running.put(containerId, launch);
|
running.put(containerId, launch);
|
||||||
break;
|
break;
|
||||||
|
case RECOVER_PAUSED_CONTAINER:
|
||||||
|
// Recovery for paused containers is not supported, thus here
|
||||||
|
// we locate any paused containers, and terminate them.
|
||||||
|
app = context.getApplications().get(
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId());
|
||||||
|
launch = new RecoverPausedContainerLaunch(context, getConfig(),
|
||||||
|
dispatcher, exec, app, event.getContainer(), dirsHandler,
|
||||||
|
containerManager);
|
||||||
|
containerLauncher.submit(launch);
|
||||||
|
break;
|
||||||
case CLEANUP_CONTAINER:
|
case CLEANUP_CONTAINER:
|
||||||
case CLEANUP_CONTAINER_FOR_REINIT:
|
case CLEANUP_CONTAINER_FOR_REINIT:
|
||||||
ContainerLaunch launcher = running.remove(containerId);
|
ContainerLaunch launcher = running.remove(containerId);
|
||||||
|
|
|
@ -26,6 +26,7 @@ public enum ContainersLauncherEventType {
|
||||||
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
|
CLEANUP_CONTAINER_FOR_REINIT, // The process(grp) itself.
|
||||||
SIGNAL_CONTAINER,
|
SIGNAL_CONTAINER,
|
||||||
PAUSE_CONTAINER,
|
PAUSE_CONTAINER,
|
||||||
RESUME_CONTAINER
|
RESUME_CONTAINER,
|
||||||
|
RECOVER_PAUSED_CONTAINER
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a ContainerLaunch which has been recovered after an NM restart for
|
||||||
|
* pause containers (for rolling upgrades)
|
||||||
|
*/
|
||||||
|
public class RecoverPausedContainerLaunch extends ContainerLaunch {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(
|
||||||
|
RecoveredContainerLaunch.class);
|
||||||
|
|
||||||
|
public RecoverPausedContainerLaunch(Context context,
|
||||||
|
Configuration configuration, Dispatcher dispatcher,
|
||||||
|
ContainerExecutor exec, Application app, Container container,
|
||||||
|
LocalDirsHandlerService dirsHandler,
|
||||||
|
ContainerManagerImpl containerManager) {
|
||||||
|
super(context, configuration, dispatcher, exec, app, container, dirsHandler,
|
||||||
|
containerManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup the paused container by issuing a kill on it.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public Integer call() {
|
||||||
|
int retCode = ContainerExecutor.ExitCode.LOST.getExitCode();
|
||||||
|
ContainerId containerId = container.getContainerId();
|
||||||
|
String appIdStr =
|
||||||
|
containerId.getApplicationAttemptId().getApplicationId().toString();
|
||||||
|
String containerIdStr = containerId.toString();
|
||||||
|
|
||||||
|
boolean notInterrupted = true;
|
||||||
|
try {
|
||||||
|
File pidFile = locatePidFile(appIdStr, containerIdStr);
|
||||||
|
if (pidFile != null) {
|
||||||
|
String pidPathStr = pidFile.getPath();
|
||||||
|
pidFilePath = new Path(pidPathStr);
|
||||||
|
exec.activateContainer(containerId, pidFilePath);
|
||||||
|
exec.signalContainer(new ContainerSignalContext.Builder()
|
||||||
|
.setContainer(container)
|
||||||
|
.setUser(container.getUser())
|
||||||
|
.setSignal(ContainerExecutor.Signal.KILL)
|
||||||
|
.build());
|
||||||
|
} else {
|
||||||
|
LOG.warn("Unable to locate pid file for container " + containerIdStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (InterruptedIOException e) {
|
||||||
|
LOG.warn("Interrupted while waiting for exit code from " + containerId);
|
||||||
|
notInterrupted = false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to kill the paused container " + containerIdStr, e);
|
||||||
|
} finally {
|
||||||
|
if (notInterrupted) {
|
||||||
|
this.completed.set(true);
|
||||||
|
exec.deactivateContainer(containerId);
|
||||||
|
try {
|
||||||
|
getContext().getNMStateStore()
|
||||||
|
.storeContainerCompleted(containerId, retCode);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to set exit code for container " + containerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.warn("Recovered container exited with a non-zero exit code "
|
||||||
|
+ retCode);
|
||||||
|
this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
|
||||||
|
containerId,
|
||||||
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
|
||||||
|
"Container exited with a non-zero exit code " + retCode));
|
||||||
|
|
||||||
|
return retCode;
|
||||||
|
}
|
||||||
|
|
||||||
|
private File locatePidFile(String appIdStr, String containerIdStr) {
|
||||||
|
String pidSubpath= getPidFileSubpath(appIdStr, containerIdStr);
|
||||||
|
for (String dir : getContext().getLocalDirsHandler().
|
||||||
|
getLocalDirsForRead()) {
|
||||||
|
File pidFile = new File(dir, pidSubpath);
|
||||||
|
if (pidFile.exists()) {
|
||||||
|
return pidFile;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -40,10 +40,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerExitEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a ContainerLaunch which has been recovered after an NM restart (for
|
* This is a ContainerLaunch which has been recovered after an NM restart (for
|
||||||
* rolling upgrades)
|
* rolling upgrades).
|
||||||
*/
|
*/
|
||||||
public class RecoveredContainerLaunch extends ContainerLaunch {
|
public class RecoveredContainerLaunch extends ContainerLaunch {
|
||||||
|
|
||||||
|
|
|
@ -117,6 +117,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
|
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
|
||||||
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
|
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
|
||||||
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
|
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
|
||||||
|
private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
|
||||||
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
|
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
|
||||||
"/resourceChanged";
|
"/resourceChanged";
|
||||||
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
|
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
|
||||||
|
@ -265,9 +266,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
|
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
|
||||||
rcs.status = RecoveredContainerStatus.QUEUED;
|
rcs.status = RecoveredContainerStatus.QUEUED;
|
||||||
}
|
}
|
||||||
|
} else if (suffix.equals(CONTAINER_PAUSED_KEY_SUFFIX)) {
|
||||||
|
if ((rcs.status == RecoveredContainerStatus.LAUNCHED)
|
||||||
|
||(rcs.status == RecoveredContainerStatus.QUEUED)
|
||||||
|
||(rcs.status == RecoveredContainerStatus.REQUESTED)) {
|
||||||
|
rcs.status = RecoveredContainerStatus.PAUSED;
|
||||||
|
}
|
||||||
} else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
|
} else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
|
||||||
if ((rcs.status == RecoveredContainerStatus.REQUESTED)
|
if ((rcs.status == RecoveredContainerStatus.REQUESTED)
|
||||||
|| (rcs.status == RecoveredContainerStatus.QUEUED)) {
|
|| (rcs.status == RecoveredContainerStatus.QUEUED)
|
||||||
|
||(rcs.status == RecoveredContainerStatus.PAUSED)) {
|
||||||
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
||||||
}
|
}
|
||||||
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
|
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
|
||||||
|
@ -346,6 +354,37 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("storeContainerPaused: containerId=" + containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||||
|
+ CONTAINER_PAUSED_KEY_SUFFIX;
|
||||||
|
try {
|
||||||
|
db.put(bytes(key), EMPTY_VALUE);
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerPaused(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("removeContainerPaused: containerId=" + containerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||||
|
+ CONTAINER_PAUSED_KEY_SUFFIX;
|
||||||
|
try {
|
||||||
|
db.delete(bytes(key));
|
||||||
|
} catch (DBException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeContainerDiagnostics(ContainerId containerId,
|
public void storeContainerDiagnostics(ContainerId containerId,
|
||||||
StringBuilder diagnostics) throws IOException {
|
StringBuilder diagnostics) throws IOException {
|
||||||
|
@ -490,6 +529,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
|
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
|
||||||
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
|
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
|
||||||
batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
|
batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
|
||||||
|
batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
|
||||||
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
|
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
|
||||||
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
|
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
|
||||||
List<String> unknownKeysForContainer = containerUnknownKeySuffixes
|
List<String> unknownKeysForContainer = containerUnknownKeySuffixes
|
||||||
|
|
|
@ -78,6 +78,15 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
||||||
public void storeContainerQueued(ContainerId containerId) throws IOException {
|
public void storeContainerQueued(ContainerId containerId) throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerPaused(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeContainerDiagnostics(ContainerId containerId,
|
public void storeContainerDiagnostics(ContainerId containerId,
|
||||||
StringBuilder diagnostics) throws IOException {
|
StringBuilder diagnostics) throws IOException {
|
||||||
|
|
|
@ -71,7 +71,8 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
REQUESTED,
|
REQUESTED,
|
||||||
QUEUED,
|
QUEUED,
|
||||||
LAUNCHED,
|
LAUNCHED,
|
||||||
COMPLETED
|
COMPLETED,
|
||||||
|
PAUSED
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RecoveredContainerState {
|
public static class RecoveredContainerState {
|
||||||
|
@ -328,9 +329,9 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the state of applications
|
* Load the state of applications.
|
||||||
* @return recovered state for applications
|
* @return recovered state for applications.
|
||||||
* @throws IOException
|
* @throws IOException IO Exception.
|
||||||
*/
|
*/
|
||||||
public abstract RecoveredApplicationsState loadApplicationsState()
|
public abstract RecoveredApplicationsState loadApplicationsState()
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
@ -380,6 +381,23 @@ public abstract class NMStateStoreService extends AbstractService {
|
||||||
public abstract void storeContainerQueued(ContainerId containerId)
|
public abstract void storeContainerQueued(ContainerId containerId)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record that a container has been paused at the NM.
|
||||||
|
* @param containerId the container ID.
|
||||||
|
* @throws IOException IO Exception.
|
||||||
|
*/
|
||||||
|
public abstract void storeContainerPaused(ContainerId containerId)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record that a container has been resumed at the NM by removing the
|
||||||
|
* fact that it has be paused.
|
||||||
|
* @param containerId the container ID.
|
||||||
|
* @throws IOException IO Exception.
|
||||||
|
*/
|
||||||
|
public abstract void removeContainerPaused(ContainerId containerId)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Record that a container has been launched
|
* Record that a container has been launched
|
||||||
* @param containerId the container ID
|
* @param containerId the container ID
|
||||||
|
|
|
@ -139,6 +139,19 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
||||||
rcs.status = RecoveredContainerStatus.QUEUED;
|
rcs.status = RecoveredContainerStatus.QUEUED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
||||||
|
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
|
||||||
|
rcs.status = RecoveredContainerStatus.PAUSED;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void removeContainerPaused(ContainerId containerId)
|
||||||
|
throws IOException {
|
||||||
|
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
|
||||||
|
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void storeContainerDiagnostics(ContainerId containerId,
|
public synchronized void storeContainerDiagnostics(ContainerId containerId,
|
||||||
StringBuilder diagnostics) throws IOException {
|
StringBuilder diagnostics) throws IOException {
|
||||||
|
|
|
@ -286,6 +286,23 @@ public class TestNMLeveldbStateStoreService {
|
||||||
assertEquals(containerReq, rcs.getStartRequest());
|
assertEquals(containerReq, rcs.getStartRequest());
|
||||||
assertEquals(diags.toString(), rcs.getDiagnostics());
|
assertEquals(diags.toString(), rcs.getDiagnostics());
|
||||||
|
|
||||||
|
// pause the container, and verify recovered
|
||||||
|
stateStore.storeContainerPaused(containerId);
|
||||||
|
restartStateStore();
|
||||||
|
recoveredContainers = stateStore.loadContainersState();
|
||||||
|
assertEquals(1, recoveredContainers.size());
|
||||||
|
rcs = recoveredContainers.get(0);
|
||||||
|
assertEquals(RecoveredContainerStatus.PAUSED, rcs.getStatus());
|
||||||
|
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||||
|
assertEquals(false, rcs.getKilled());
|
||||||
|
assertEquals(containerReq, rcs.getStartRequest());
|
||||||
|
|
||||||
|
// Resume the container
|
||||||
|
stateStore.removeContainerPaused(containerId);
|
||||||
|
restartStateStore();
|
||||||
|
recoveredContainers = stateStore.loadContainersState();
|
||||||
|
assertEquals(1, recoveredContainers.size());
|
||||||
|
|
||||||
// increase the container size, and verify recovered
|
// increase the container size, and verify recovered
|
||||||
stateStore.storeContainerResourceChanged(containerId, 2,
|
stateStore.storeContainerResourceChanged(containerId, 2,
|
||||||
Resource.newInstance(2468, 4));
|
Resource.newInstance(2468, 4));
|
||||||
|
|
Loading…
Reference in New Issue