YARN-7275. NM Statestore cleanup for Container updates. (Kartheek Muthyala via asuresh)

This commit is contained in:
Arun Suresh 2017-10-16 13:08:52 -07:00
parent 8dbc8909c9
commit a50be1b8f4
14 changed files with 179 additions and 69 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -367,6 +368,13 @@ public class ContainerManagerImpl extends CompositeService implements
} }
recoverContainer(rcs); recoverContainer(rcs);
} }
//Dispatching the RECOVERY_COMPLETED event through the dispatcher
//so that all the paused, scheduled and queued containers will
//be scheduled for execution on availability of resources.
dispatcher.getEventHandler().handle(
new ContainerSchedulerEvent(null,
ContainerSchedulerEventType.RECOVERY_COMPLETED));
} else { } else {
LOG.info("Not a recoverable state store. Nothing to recover."); LOG.info("Not a recoverable state store. Nothing to recover.");
} }
@ -480,6 +488,7 @@ public class ContainerManagerImpl extends CompositeService implements
Container container = new ContainerImpl(getConfig(), dispatcher, Container container = new ContainerImpl(getConfig(), dispatcher,
launchContext, credentials, metrics, token, context, rcs); launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container); context.getContainers().put(token.getContainerID(), container);
containerScheduler.recoverActiveContainer(container, rcs.getStatus());
app.handle(new ApplicationContainerInitEvent(container)); app.handle(new ApplicationContainerInitEvent(container));
} }

View File

@ -46,5 +46,7 @@ public enum ContainerEventType {
CONTAINER_RESUMED, CONTAINER_RESUMED,
// Producer: ContainerScheduler // Producer: ContainerScheduler
CONTAINER_TOKEN_UPDATED CONTAINER_TOKEN_UPDATED,
RECOVER_PAUSED_CONTAINER
} }

View File

@ -362,6 +362,9 @@ public class ContainerImpl implements Container {
// From SCHEDULED State // From SCHEDULED State
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING, .addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition()) ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.PAUSED,
ContainerEventType.RECOVER_PAUSED_CONTAINER,
new RecoveredContainerTransition())
.addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE, .addTransition(ContainerState.SCHEDULED, ContainerState.EXITED_WITH_FAILURE,
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
new ExitedWithFailureTransition(true)) new ExitedWithFailureTransition(true))
@ -952,7 +955,10 @@ public class ContainerImpl implements Container {
if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) { if (recoveredStatus == RecoveredContainerStatus.LAUNCHED) {
// try to recover a container that was previously launched // try to recover a container that was previously launched
launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER; launcherEvent = ContainersLauncherEventType.RECOVER_CONTAINER;
} else if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
} }
containerLaunchStartTime = clock.getTime(); containerLaunchStartTime = clock.getTime();
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new ContainersLauncherEvent(this, launcherEvent)); new ContainersLauncherEvent(this, launcherEvent));
@ -963,9 +969,6 @@ public class ContainerImpl implements Container {
@SuppressWarnings("unchecked") // dispatcher not typed @SuppressWarnings("unchecked") // dispatcher not typed
private void sendScheduleEvent() { private void sendScheduleEvent() {
if (recoveredStatus == RecoveredContainerStatus.PAUSED) { if (recoveredStatus == RecoveredContainerStatus.PAUSED) {
// Recovery is not supported for paused container so we raise the
// launch event which will proceed to kill the paused container instead
// of raising the schedule event.
ContainersLauncherEventType launcherEvent; ContainersLauncherEventType launcherEvent;
launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER; launcherEvent = ContainersLauncherEventType.RECOVER_PAUSED_CONTAINER;
dispatcher.getEventHandler() dispatcher.getEventHandler()
@ -1060,17 +1063,15 @@ public class ContainerImpl implements Container {
UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event; UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
// Update the container token // Update the container token
container.setContainerTokenIdentifier(updateEvent.getUpdatedToken()); container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
if (updateEvent.isResourceChange()) {
try { try {
// Persist change in the state store. // Persist change in the state store.
container.context.getNMStateStore().storeContainerResourceChanged( container.context.getNMStateStore()
container.containerId, .storeContainerUpdateToken(container.containerId,
container.getContainerTokenIdentifier().getVersion(), container.getContainerTokenIdentifier());
container.getResource()); } catch (IOException e) {
} catch (IOException e) { LOG.warn("Could not store container [" + container.containerId
LOG.warn("Could not store container [" + container.containerId + "] update..", e);
+ "] resource change..", e);
}
} }
} }
} }
@ -1115,6 +1116,8 @@ public class ContainerImpl implements Container {
if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) { if (container.recoveredStatus == RecoveredContainerStatus.COMPLETED) {
container.sendFinishedEvents(); container.sendFinishedEvents();
return ContainerState.DONE; return ContainerState.DONE;
} else if (container.recoveredStatus == RecoveredContainerStatus.QUEUED) {
return ContainerState.SCHEDULED;
} else if (container.recoveredAsKilled && } else if (container.recoveredAsKilled &&
container.recoveredStatus == RecoveredContainerStatus.REQUESTED) { container.recoveredStatus == RecoveredContainerStatus.REQUESTED) {
// container was killed but never launched // container was killed but never launched
@ -1473,6 +1476,18 @@ public class ContainerImpl implements Container {
} }
} }
/**
* Transition from SCHEDULED state to PAUSED state on recovery
*/
static class RecoveredContainerTransition extends ContainerTransition {
@SuppressWarnings("unchecked")
@Override
public void transition(ContainerImpl container, ContainerEvent event) {
container.sendContainerMonitorStartEvent();
container.wasLaunched = true;
}
}
/** /**
* Transition from RUNNING or KILLING state to * Transition from RUNNING or KILLING state to
* EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message. * EXITED_WITH_SUCCESS state upon EXITED_WITH_SUCCESS message.

View File

@ -140,8 +140,6 @@ public class ContainersLauncher extends AbstractService
running.put(containerId, launch); running.put(containerId, launch);
break; break;
case RECOVER_PAUSED_CONTAINER: case RECOVER_PAUSED_CONTAINER:
// Recovery for paused containers is not supported, thus here
// we locate any paused containers, and terminate them.
app = context.getApplications().get( app = context.getApplications().get(
containerId.getApplicationAttemptId().getApplicationId()); containerId.getApplicationAttemptId().getApplicationId());
launch = new RecoverPausedContainerLaunch(context, getConfig(), launch = new RecoverPausedContainerLaunch(context, getConfig(),

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.*;
import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext; import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerReacquisitionContext;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -66,6 +66,8 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch {
containerId.getApplicationAttemptId().getApplicationId().toString(); containerId.getApplicationAttemptId().getApplicationId().toString();
String containerIdStr = containerId.toString(); String containerIdStr = containerId.toString();
dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
ContainerEventType.RECOVER_PAUSED_CONTAINER));
boolean notInterrupted = true; boolean notInterrupted = true;
try { try {
File pidFile = locatePidFile(appIdStr, containerIdStr); File pidFile = locatePidFile(appIdStr, containerIdStr);
@ -73,16 +75,17 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch {
String pidPathStr = pidFile.getPath(); String pidPathStr = pidFile.getPath();
pidFilePath = new Path(pidPathStr); pidFilePath = new Path(pidPathStr);
exec.activateContainer(containerId, pidFilePath); exec.activateContainer(containerId, pidFilePath);
exec.signalContainer(new ContainerSignalContext.Builder() retCode = exec.reacquireContainer(
.setContainer(container) new ContainerReacquisitionContext.Builder()
.setUser(container.getUser()) .setContainer(container)
.setSignal(ContainerExecutor.Signal.KILL) .setUser(container.getUser())
.build()); .setContainerId(containerId)
.build());
} else { } else {
LOG.warn("Unable to locate pid file for container " + containerIdStr); LOG.warn("Unable to locate pid file for container " + containerIdStr);
} }
} catch (InterruptedIOException e) { } catch (InterruptedException | InterruptedIOException e) {
LOG.warn("Interrupted while waiting for exit code from " + containerId); LOG.warn("Interrupted while waiting for exit code from " + containerId);
notInterrupted = false; notInterrupted = false;
} catch (IOException e) { } catch (IOException e) {
@ -100,14 +103,21 @@ public class RecoverPausedContainerLaunch extends ContainerLaunch {
} }
} }
LOG.warn("Recovered container exited with a non-zero exit code " if (retCode != 0) {
+ retCode); LOG.warn("Recovered container exited with a non-zero exit code "
this.dispatcher.getEventHandler().handle(new ContainerExitEvent( + retCode);
containerId, this.dispatcher.getEventHandler().handle(new ContainerExitEvent(
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode, containerId,
"Container exited with a non-zero exit code " + retCode)); ContainerEventType.CONTAINER_EXITED_WITH_FAILURE, retCode,
"Container exited with a non-zero exit code " + retCode));
return retCode;
}
return retCode; LOG.info("Recovered container " + containerId + " succeeded");
dispatcher.getEventHandler().handle(
new ContainerEvent(containerId,
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
return 0;
} }
private File locatePidFile(String appIdStr, String containerIdStr) { private File locatePidFile(String appIdStr, String containerIdStr) {

View File

@ -72,7 +72,7 @@ public class RecoveredContainerLaunch extends ContainerLaunch {
String containerIdStr = containerId.toString(); String containerIdStr = containerId.toString();
dispatcher.getEventHandler().handle(new ContainerEvent(containerId, dispatcher.getEventHandler().handle(new ContainerEvent(containerId,
ContainerEventType.CONTAINER_LAUNCHED)); ContainerEventType.RECOVER_PAUSED_CONTAINER));
boolean notInterrupted = true; boolean notInterrupted = true;
try { try {

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -168,6 +169,8 @@ public class ContainerScheduler extends AbstractService implements
case SHED_QUEUED_CONTAINERS: case SHED_QUEUED_CONTAINERS:
shedQueuedOpportunisticContainers(); shedQueuedOpportunisticContainers();
break; break;
case RECOVERY_COMPLETED:
startPendingContainers(maxOppQueueLength <= 0);
default: default:
LOG.error("Unknown event arrived at ContainerScheduler: " LOG.error("Unknown event arrived at ContainerScheduler: "
+ event.toString()); + event.toString());
@ -218,6 +221,34 @@ public class ContainerScheduler extends AbstractService implements
} }
} }
/**
* Populates auxiliary data structures used by the ContainerScheduler on
* recovery.
* @param container container recovered
* @param rcs Recovered Container status
*/
public void recoverActiveContainer(Container container,
RecoveredContainerStatus rcs) {
ExecutionType execType =
container.getContainerTokenIdentifier().getExecutionType();
if (rcs == RecoveredContainerStatus.QUEUED
|| rcs == RecoveredContainerStatus.PAUSED) {
if (execType == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.put(container.getContainerId(), container);
} else if (execType == ExecutionType.OPPORTUNISTIC) {
queuedOpportunisticContainers
.put(container.getContainerId(), container);
} else {
LOG.error(
"UnKnown execution type received " + container.getContainerId()
+ ", execType " + execType);
}
} else if (rcs == RecoveredContainerStatus.LAUNCHED) {
runningContainers.put(container.getContainerId(), container);
utilizationTracker.addContainerResources(container);
}
}
/** /**
* Return number of queued containers. * Return number of queued containers.
* @return Number of queued containers. * @return Number of queued containers.

View File

@ -27,5 +27,6 @@ public enum ContainerSchedulerEventType {
UPDATE_CONTAINER, UPDATE_CONTAINER,
// Producer: Node HB response - RM has asked to shed the queue // Producer: Node HB response - RM has asked to shed the queue
SHED_QUEUED_CONTAINERS, SHED_QUEUED_CONTAINERS,
CONTAINER_PAUSED CONTAINER_PAUSED,
RECOVERY_COMPLETED
} }

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
import static org.fusesource.leveldbjni.JniDBFactory.asString; import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes; import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
@ -47,11 +50,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@ -120,8 +121,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused"; private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = private static final String CONTAINER_UPDATE_TOKEN_SUFFIX =
"/resourceChanged"; "/updateToken";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX = private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
@ -290,9 +291,17 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
rcs.status = RecoveredContainerStatus.COMPLETED; rcs.status = RecoveredContainerStatus.COMPLETED;
rcs.exitCode = Integer.parseInt(asString(entry.getValue())); rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
} else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) {
rcs.capability = new ResourcePBImpl( ContainerTokenIdentifierProto tokenIdentifierProto =
ResourceProto.parseFrom(entry.getValue())); ContainerTokenIdentifierProto.parseFrom(entry.getValue());
Token currentToken = rcs.getStartRequest().getContainerToken();
Token updatedToken = Token
.newInstance(tokenIdentifierProto.toByteArray(),
ContainerTokenIdentifier.KIND.toString(),
currentToken.getPassword().array(), currentToken.getService());
rcs.startRequest.setContainerToken(updatedToken);
rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource());
rcs.version = tokenIdentifierProto.getVersion();
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
rcs.setRemainingRetryAttempts( rcs.setRemainingRetryAttempts(
Integer.parseInt(asString(entry.getValue()))); Integer.parseInt(asString(entry.getValue())));
@ -374,6 +383,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
} }
private void removeContainerQueued(ContainerId containerId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("removeContainerQueued: containerId=" + containerId);
}
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_QUEUED_KEY_SUFFIX;
try {
db.delete(bytes(key));
} catch (DBException e) {
throw new IOException(e);
}
}
@Override @Override
public void storeContainerPaused(ContainerId containerId) throws IOException { public void storeContainerPaused(ContainerId containerId) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -429,6 +453,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
LOG.debug("storeContainerLaunched: containerId=" + containerId); LOG.debug("storeContainerLaunched: containerId=" + containerId);
} }
// Removing the container if queued for backward compatibility reasons
removeContainerQueued(containerId);
String key = CONTAINERS_KEY_PREFIX + containerId.toString() String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_LAUNCHED_KEY_SUFFIX; + CONTAINER_LAUNCHED_KEY_SUFFIX;
try { try {
@ -439,24 +465,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
@Override @Override
public void storeContainerResourceChanged(ContainerId containerId, public void storeContainerUpdateToken(ContainerId containerId,
int containerVersion, Resource capability) throws IOException { ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("storeContainerResourceChanged: containerId=" + containerId LOG.debug("storeContainerUpdateToken: containerId=" + containerId);
+ ", capability=" + capability);
} }
String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString() String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + CONTAINER_UPDATE_TOKEN_SUFFIX;
String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString() String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_VERSION_KEY_SUFFIX; + CONTAINER_VERSION_KEY_SUFFIX;
try { try {
WriteBatch batch = db.createWriteBatch(); WriteBatch batch = db.createWriteBatch();
try { try {
// New value will overwrite old values for the same key // New value will overwrite old values for the same key
batch.put(bytes(keyResChng), batch.put(bytes(keyUpdateToken),
ProtoUtils.convertToProtoFormat(capability).toByteArray()); containerTokenIdentifier.getProto().toByteArray());
batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion))); batch.put(bytes(keyVersion),
bytes(Integer.toString(containerTokenIdentifier.getVersion())));
db.write(batch); db.write(batch);
} finally { } finally {
batch.close(); batch.close();
@ -552,6 +579,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_UPDATE_TOKEN_SUFFIX));
List<String> unknownKeysForContainer = containerUnknownKeySuffixes List<String> unknownKeysForContainer = containerUnknownKeySuffixes
.removeAll(containerId); .removeAll(containerId);
for (String unknownKeySuffix : unknownKeysForContainer) { for (String unknownKeySuffix : unknownKeysForContainer) {

View File

@ -28,12 +28,12 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
// The state store to use when state isn't being stored // The state store to use when state isn't being stored
@ -99,8 +99,8 @@ public class NMNullStateStoreService extends NMStateStoreService {
} }
@Override @Override
public void storeContainerResourceChanged(ContainerId containerId, public void storeContainerUpdateToken(ContainerId containerId,
int version, Resource capability) throws IOException { ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
} }
@Override @Override

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@ -429,14 +430,13 @@ public abstract class NMStateStoreService extends AbstractService {
throws IOException; throws IOException;
/** /**
* Record that a container resource has been changed * Record that a container has been updated
* @param containerId the container ID * @param containerId the container ID
* @param containerVersion the container version * @param containerTokenIdentifier container token identifier
* @param capability the container resource capability
* @throws IOException * @throws IOException
*/ */
public abstract void storeContainerResourceChanged(ContainerId containerId, public abstract void storeContainerUpdateToken(ContainerId containerId,
int containerVersion, Resource capability) throws IOException; ContainerTokenIdentifier containerTokenIdentifier) throws IOException;
/** /**
* Record that a container has completed * Record that a container has completed

View File

@ -405,6 +405,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
stateStore.start(); stateStore.start();
context = createContext(conf, stateStore); context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc); ContainerManagerImpl cm = createContainerManager(context, delSrvc);
((NMContext) context).setContainerManager(cm);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
// add an application by starting a container // add an application by starting a container
@ -430,6 +431,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.stop(); cm.stop();
context = createContext(conf, stateStore); context = createContext(conf, stateStore);
cm = createContainerManager(context); cm = createContainerManager(context);
((NMContext) context).setContainerManager(cm);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
assertEquals(1, context.getApplications().size()); assertEquals(1, context.getApplications().size());
@ -448,6 +450,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
stateStore.start(); stateStore.start();
context = createContext(conf, stateStore); context = createContext(conf, stateStore);
ContainerManagerImpl cm = createContainerManager(context, delSrvc); ContainerManagerImpl cm = createContainerManager(context, delSrvc);
((NMContext) context).setContainerManager(cm);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
@ -473,6 +476,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
cm.stop(); cm.stop();
context = createContext(conf, stateStore); context = createContext(conf, stateStore);
cm = createContainerManager(context); cm = createContainerManager(context);
((NMContext) context).setContainerManager(cm);
cm.init(conf); cm.init(conf);
cm.start(); cm.start();
assertEquals(1, context.getApplications().size()); assertEquals(1, context.getApplications().size());

View File

@ -33,12 +33,13 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
@ -175,12 +176,17 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
} }
@Override @Override
public synchronized void storeContainerResourceChanged( public void storeContainerUpdateToken(ContainerId containerId,
ContainerId containerId, int version, Resource capability) ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId); RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.capability = capability; rcs.capability = containerTokenIdentifier.getResource();
rcs.version = version; rcs.version = containerTokenIdentifier.getVersion();
Token currentToken = rcs.getStartRequest().getContainerToken();
Token updatedToken = Token
.newInstance(containerTokenIdentifier.getBytes(),
ContainerTokenIdentifier.KIND.toString(),
currentToken.getPassword().array(), currentToken.getService());
rcs.startRequest.setContainerToken(updatedToken);
} }
@Override @Override

View File

@ -307,13 +307,18 @@ public class TestNMLeveldbStateStoreService {
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
// increase the container size, and verify recovered // increase the container size, and verify recovered
stateStore.storeContainerResourceChanged(containerId, 2, ContainerTokenIdentifier updateTokenIdentifier =
Resource.newInstance(2468, 4)); new ContainerTokenIdentifier(containerId, "host", "user",
Resource.newInstance(2468, 4), 9876543210L, 42, 2468,
Priority.newInstance(7), 13579);
stateStore
.storeContainerUpdateToken(containerId, updateTokenIdentifier);
restartStateStore(); restartStateStore();
recoveredContainers = stateStore.loadContainersState(); recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0); rcs = recoveredContainers.get(0);
assertEquals(2, rcs.getVersion()); assertEquals(0, rcs.getVersion());
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled()); assertEquals(false, rcs.getKilled());
@ -330,7 +335,9 @@ public class TestNMLeveldbStateStoreService {
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertTrue(rcs.getKilled()); assertTrue(rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest()); ContainerTokenIdentifier tokenReadFromRequest = BuilderUtils
.newContainerTokenIdentifier(rcs.getStartRequest().getContainerToken());
assertEquals(updateTokenIdentifier, tokenReadFromRequest);
assertEquals(diags.toString(), rcs.getDiagnostics()); assertEquals(diags.toString(), rcs.getDiagnostics());
// add yet more diags, mark container completed, and verify recovered // add yet more diags, mark container completed, and verify recovered
@ -344,7 +351,6 @@ public class TestNMLeveldbStateStoreService {
assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus()); assertEquals(RecoveredContainerStatus.COMPLETED, rcs.getStatus());
assertEquals(21, rcs.getExitCode()); assertEquals(21, rcs.getExitCode());
assertTrue(rcs.getKilled()); assertTrue(rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertEquals(diags.toString(), rcs.getDiagnostics()); assertEquals(diags.toString(), rcs.getDiagnostics());
// store remainingRetryAttempts, workDir and logDir // store remainingRetryAttempts, workDir and logDir