YARN-7240. Add more states and transitions to stabilize the NM Container state machine. (Kartheek Muthyala via asuresh)
(cherry picked from commit df800f6cf3ea663daf4081ebe784808b08d9366d)
This commit is contained in:
parent
ed855e2eef
commit
59453dad8c
@ -20,6 +20,7 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
@ -144,7 +145,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
|
||||||
@ -1251,29 +1251,6 @@ private void updateContainerInternal(ContainerId containerId,
|
|||||||
+ " [" + containerTokenIdentifier.getVersion() + "]");
|
+ " [" + containerTokenIdentifier.getVersion() + "]");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check container state
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.
|
|
||||||
containermanager.container.ContainerState currentState =
|
|
||||||
container.getContainerState();
|
|
||||||
EnumSet<org.apache.hadoop.yarn.server.nodemanager.containermanager
|
|
||||||
.container.ContainerState> allowedStates = EnumSet.of(
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
||||||
.ContainerState.RUNNING,
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
||||||
.ContainerState.SCHEDULED,
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
||||||
.ContainerState.LOCALIZING,
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
||||||
.ContainerState.REINITIALIZING,
|
|
||||||
org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
|
||||||
.ContainerState.RELAUNCHING);
|
|
||||||
if (!allowedStates.contains(currentState)) {
|
|
||||||
throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
|
||||||
+ " is in " + currentState.name() + " state."
|
|
||||||
+ " Resource can only be changed when a container is in"
|
|
||||||
+ " RUNNING or SCHEDULED state");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check validity of the target resource.
|
// Check validity of the target resource.
|
||||||
Resource currentResource = container.getResource();
|
Resource currentResource = container.getResource();
|
||||||
ExecutionType currentExecType =
|
ExecutionType currentExecType =
|
||||||
@ -1313,11 +1290,11 @@ private void updateContainerInternal(ContainerId containerId,
|
|||||||
this.readLock.lock();
|
this.readLock.lock();
|
||||||
try {
|
try {
|
||||||
if (!serviceStopped) {
|
if (!serviceStopped) {
|
||||||
// Dispatch message to ContainerScheduler to actually
|
// Dispatch message to Container to actually
|
||||||
// make the change.
|
// make the change.
|
||||||
dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
|
dispatcher.getEventHandler().handle(new UpdateContainerTokenEvent(
|
||||||
container, containerTokenIdentifier, isResourceChange,
|
container.getContainerId(), containerTokenIdentifier,
|
||||||
isExecTypeUpdate, isIncrease));
|
isResourceChange, isExecTypeUpdate, isIncrease));
|
||||||
} else {
|
} else {
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"Unable to change container resource as the NodeManager is "
|
"Unable to change container resource as the NodeManager is "
|
||||||
@ -1816,10 +1793,14 @@ private Container preReInitializeOrLocalizeCheck(ContainerId containerId,
|
|||||||
if (container == null) {
|
if (container == null) {
|
||||||
throw new YarnException("Specified " + containerId + " does not exist!");
|
throw new YarnException("Specified " + containerId + " does not exist!");
|
||||||
}
|
}
|
||||||
if (!container.isRunning() || container.isReInitializing()) {
|
if (!container.isRunning() || container.isReInitializing()
|
||||||
|
|| container.getContainerTokenIdentifier().getExecutionType()
|
||||||
|
== ExecutionType.OPPORTUNISTIC) {
|
||||||
throw new YarnException("Cannot perform " + op + " on [" + containerId
|
throw new YarnException("Cannot perform " + op + " on [" + containerId
|
||||||
+ "]. Current state is [" + container.getContainerState() + ", " +
|
+ "]. Current state is [" + container.getContainerState() + ", " +
|
||||||
"isReInitializing=" + container.isReInitializing() + "].");
|
"isReInitializing=" + container.isReInitializing() + "]. Container"
|
||||||
|
+ " Execution Type is [" + container.getContainerTokenIdentifier()
|
||||||
|
.getExecutionType() + "].");
|
||||||
}
|
}
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
@ -29,6 +29,7 @@ public enum ContainerEventType {
|
|||||||
ROLLBACK_REINIT,
|
ROLLBACK_REINIT,
|
||||||
PAUSE_CONTAINER,
|
PAUSE_CONTAINER,
|
||||||
RESUME_CONTAINER,
|
RESUME_CONTAINER,
|
||||||
|
UPDATE_CONTAINER_TOKEN,
|
||||||
|
|
||||||
// DownloadManager
|
// DownloadManager
|
||||||
CONTAINER_INITED,
|
CONTAINER_INITED,
|
||||||
@ -42,5 +43,8 @@ public enum ContainerEventType {
|
|||||||
CONTAINER_EXITED_WITH_FAILURE,
|
CONTAINER_EXITED_WITH_FAILURE,
|
||||||
CONTAINER_KILLED_ON_REQUEST,
|
CONTAINER_KILLED_ON_REQUEST,
|
||||||
CONTAINER_PAUSED,
|
CONTAINER_PAUSED,
|
||||||
CONTAINER_RESUMED
|
CONTAINER_RESUMED,
|
||||||
|
|
||||||
|
// Producer: ContainerScheduler
|
||||||
|
CONTAINER_TOKEN_UPDATED
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,8 @@
|
|||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ -305,8 +307,8 @@ ContainerEventType.INIT_CONTAINER, new RequestResourcesTransition())
|
|||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
.addTransition(ContainerState.NEW, ContainerState.DONE,
|
.addTransition(ContainerState.NEW, ContainerState.DONE,
|
||||||
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
|
ContainerEventType.KILL_CONTAINER, new KillOnNewTransition())
|
||||||
.addTransition(ContainerState.NEW, ContainerState.DONE,
|
.addTransition(ContainerState.NEW, ContainerState.NEW,
|
||||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
|
||||||
|
|
||||||
// From LOCALIZING State
|
// From LOCALIZING State
|
||||||
.addTransition(ContainerState.LOCALIZING,
|
.addTransition(ContainerState.LOCALIZING,
|
||||||
@ -322,8 +324,9 @@ ContainerEventType.RESOURCE_LOCALIZED, new LocalizedTransition())
|
|||||||
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
|
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
|
||||||
ContainerEventType.KILL_CONTAINER,
|
ContainerEventType.KILL_CONTAINER,
|
||||||
new KillBeforeRunningTransition())
|
new KillBeforeRunningTransition())
|
||||||
.addTransition(ContainerState.LOCALIZING, ContainerState.KILLING,
|
.addTransition(ContainerState.LOCALIZING, ContainerState.LOCALIZING,
|
||||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
|
||||||
|
|
||||||
|
|
||||||
// From LOCALIZATION_FAILED State
|
// From LOCALIZATION_FAILED State
|
||||||
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
||||||
@ -348,6 +351,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
||||||
ContainerState.LOCALIZATION_FAILED,
|
ContainerState.LOCALIZATION_FAILED,
|
||||||
ContainerEventType.RESOURCE_FAILED)
|
ContainerEventType.RESOURCE_FAILED)
|
||||||
|
.addTransition(ContainerState.LOCALIZATION_FAILED,
|
||||||
|
ContainerState.LOCALIZATION_FAILED,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN, new UpdateTransition())
|
||||||
|
|
||||||
// From SCHEDULED State
|
// From SCHEDULED State
|
||||||
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
|
.addTransition(ContainerState.SCHEDULED, ContainerState.RUNNING,
|
||||||
@ -361,6 +367,9 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
|
|||||||
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
|
.addTransition(ContainerState.SCHEDULED, ContainerState.KILLING,
|
||||||
ContainerEventType.KILL_CONTAINER,
|
ContainerEventType.KILL_CONTAINER,
|
||||||
new KillBeforeRunningTransition())
|
new KillBeforeRunningTransition())
|
||||||
|
.addTransition(ContainerState.SCHEDULED, ContainerState.SCHEDULED,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
// From RUNNING State
|
// From RUNNING State
|
||||||
.addTransition(ContainerState.RUNNING,
|
.addTransition(ContainerState.RUNNING,
|
||||||
@ -373,10 +382,16 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
|
|||||||
ContainerState.EXITED_WITH_FAILURE),
|
ContainerState.EXITED_WITH_FAILURE),
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
new RetryFailureTransition())
|
new RetryFailureTransition())
|
||||||
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
|
.addTransition(ContainerState.RUNNING,
|
||||||
|
EnumSet.of(ContainerState.RUNNING,
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL),
|
||||||
ContainerEventType.REINITIALIZE_CONTAINER,
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
new ReInitializeContainerTransition())
|
new ReInitializeContainerTransition())
|
||||||
.addTransition(ContainerState.RUNNING, ContainerState.REINITIALIZING,
|
.addTransition(ContainerState.RUNNING,
|
||||||
|
EnumSet.of(ContainerState.RUNNING,
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL),
|
||||||
ContainerEventType.ROLLBACK_REINIT,
|
ContainerEventType.ROLLBACK_REINIT,
|
||||||
new RollbackContainerTransition())
|
new RollbackContainerTransition())
|
||||||
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||||
@ -396,8 +411,15 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
new KilledExternallyTransition())
|
new KilledExternallyTransition())
|
||||||
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
|
.addTransition(ContainerState.RUNNING, ContainerState.PAUSING,
|
||||||
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
|
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
|
||||||
|
.addTransition(ContainerState.RUNNING, ContainerState.RUNNING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
|
|
||||||
// From PAUSING State
|
// From PAUSING State
|
||||||
|
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
new ResourceLocalizedWhileRunningTransition())
|
||||||
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
|
.addTransition(ContainerState.PAUSING, ContainerState.KILLING,
|
||||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||||
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
||||||
@ -417,6 +439,12 @@ ContainerEventType.CONTAINER_PAUSED, new PausedContainerTransition())
|
|||||||
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
|
.addTransition(ContainerState.PAUSING, ContainerState.EXITED_WITH_FAILURE,
|
||||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
new KilledExternallyTransition())
|
new KilledExternallyTransition())
|
||||||
|
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
new ResourceLocalizedWhileRunningTransition())
|
||||||
|
.addTransition(ContainerState.PAUSING, ContainerState.PAUSING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
// From PAUSED State
|
// From PAUSED State
|
||||||
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
|
.addTransition(ContainerState.PAUSED, ContainerState.KILLING,
|
||||||
@ -426,6 +454,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
||||||
ContainerEventType.PAUSE_CONTAINER)
|
ContainerEventType.PAUSE_CONTAINER)
|
||||||
|
// This can happen during re-initialization.
|
||||||
|
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
new ResourceLocalizedWhileRunningTransition())
|
||||||
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
|
.addTransition(ContainerState.PAUSED, ContainerState.RESUMING,
|
||||||
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
|
ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
|
||||||
// In case something goes wrong then container will exit from the
|
// In case something goes wrong then container will exit from the
|
||||||
@ -441,6 +473,9 @@ ContainerEventType.RESUME_CONTAINER, new ResumeContainerTransition())
|
|||||||
ContainerState.EXITED_WITH_SUCCESS,
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
new ExitedWithSuccessTransition(true))
|
new ExitedWithSuccessTransition(true))
|
||||||
|
.addTransition(ContainerState.PAUSED, ContainerState.PAUSED,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
// From RESUMING State
|
// From RESUMING State
|
||||||
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
|
.addTransition(ContainerState.RESUMING, ContainerState.KILLING,
|
||||||
@ -450,6 +485,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
|
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
|
||||||
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
|
// This can happen during re-initialization
|
||||||
|
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
new ResourceLocalizedWhileRunningTransition())
|
||||||
// In case something goes wrong then container will exit from the
|
// In case something goes wrong then container will exit from the
|
||||||
// RESUMING state
|
// RESUMING state
|
||||||
.addTransition(ContainerState.RESUMING,
|
.addTransition(ContainerState.RESUMING,
|
||||||
@ -464,6 +503,10 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
ContainerState.EXITED_WITH_SUCCESS,
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
new ExitedWithSuccessTransition(true))
|
new ExitedWithSuccessTransition(true))
|
||||||
|
.addTransition(ContainerState.RESUMING, ContainerState.RESUMING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
// NOTE - We cannot get a PAUSE_CONTAINER while in RESUMING state.
|
||||||
|
|
||||||
// From REINITIALIZING State
|
// From REINITIALIZING State
|
||||||
.addTransition(ContainerState.REINITIALIZING,
|
.addTransition(ContainerState.REINITIALIZING,
|
||||||
@ -475,7 +518,8 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
new ExitedWithFailureTransition(true))
|
new ExitedWithFailureTransition(true))
|
||||||
.addTransition(ContainerState.REINITIALIZING,
|
.addTransition(ContainerState.REINITIALIZING,
|
||||||
ContainerState.REINITIALIZING,
|
EnumSet.of(ContainerState.REINITIALIZING,
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL),
|
||||||
ContainerEventType.RESOURCE_LOCALIZED,
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
new ResourceLocalizedWhileReInitTransition())
|
new ResourceLocalizedWhileReInitTransition())
|
||||||
.addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
|
.addTransition(ContainerState.REINITIALIZING, ContainerState.RUNNING,
|
||||||
@ -487,12 +531,39 @@ ContainerEventType.KILL_CONTAINER, new KillTransition())
|
|||||||
UPDATE_DIAGNOSTICS_TRANSITION)
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
|
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
|
||||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||||
.addTransition(ContainerState.REINITIALIZING, ContainerState.KILLING,
|
.addTransition(ContainerState.REINITIALIZING, ContainerState.PAUSING,
|
||||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
ContainerEventType.PAUSE_CONTAINER, new PauseContainerTransition())
|
||||||
.addTransition(ContainerState.REINITIALIZING,
|
.addTransition(ContainerState.REINITIALIZING,
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
|
// from REINITIALIZING_AWAITING_KILL
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
|
new ExitedWithSuccessTransition(true))
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
|
new ExitedWithFailureTransition(true))
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
UPDATE_DIAGNOSTICS_TRANSITION)
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.KILLING,
|
||||||
|
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.SCHEDULED, ContainerEventType.PAUSE_CONTAINER)
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
ContainerState.SCHEDULED,
|
ContainerState.SCHEDULED,
|
||||||
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
new KilledForReInitializationTransition())
|
new KilledForReInitializationTransition())
|
||||||
|
.addTransition(ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
// From RELAUNCHING State
|
// From RELAUNCHING State
|
||||||
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
|
.addTransition(ContainerState.RELAUNCHING, ContainerState.RUNNING,
|
||||||
@ -508,6 +579,10 @@ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
|
|||||||
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
ContainerEventType.KILL_CONTAINER, new KillTransition())
|
||||||
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
|
.addTransition(ContainerState.RELAUNCHING, ContainerState.KILLING,
|
||||||
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
||||||
|
.addTransition(ContainerState.RELAUNCHING, ContainerState.RELAUNCHING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
new NotifyContainerSchedulerOfUpdateTransition())
|
||||||
|
|
||||||
|
|
||||||
// From CONTAINER_EXITED_WITH_SUCCESS State
|
// From CONTAINER_EXITED_WITH_SUCCESS State
|
||||||
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
|
.addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
|
||||||
@ -521,6 +596,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
ContainerState.EXITED_WITH_SUCCESS,
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||||
ContainerEventType.PAUSE_CONTAINER))
|
ContainerEventType.PAUSE_CONTAINER))
|
||||||
|
// No transition - assuming container is on its way to completion
|
||||||
|
.addTransition(ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
|
||||||
// From EXITED_WITH_FAILURE State
|
// From EXITED_WITH_FAILURE State
|
||||||
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
.addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
|
||||||
@ -534,6 +613,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
ContainerState.EXITED_WITH_FAILURE,
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
EnumSet.of(ContainerEventType.KILL_CONTAINER,
|
||||||
ContainerEventType.PAUSE_CONTAINER))
|
ContainerEventType.PAUSE_CONTAINER))
|
||||||
|
// No transition - assuming container is on its way to completion
|
||||||
|
.addTransition(ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerState.EXITED_WITH_FAILURE,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
|
||||||
// From KILLING State.
|
// From KILLING State.
|
||||||
.addTransition(ContainerState.KILLING,
|
.addTransition(ContainerState.KILLING,
|
||||||
@ -569,6 +652,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
ContainerState.KILLING,
|
ContainerState.KILLING,
|
||||||
EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
|
EnumSet.of(ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
ContainerEventType.PAUSE_CONTAINER))
|
ContainerEventType.PAUSE_CONTAINER))
|
||||||
|
// No transition - assuming container is on its way to completion
|
||||||
|
.addTransition(ContainerState.KILLING, ContainerState.KILLING,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
|
||||||
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
|
// From CONTAINER_CLEANEDUP_AFTER_KILL State.
|
||||||
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
@ -586,6 +672,10 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
ContainerEventType.PAUSE_CONTAINER))
|
ContainerEventType.PAUSE_CONTAINER))
|
||||||
|
// No transition - assuming container is on its way to completion
|
||||||
|
.addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
|
||||||
// From DONE
|
// From DONE
|
||||||
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
@ -603,6 +693,9 @@ ContainerEventType.PAUSE_CONTAINER, new KillOnPauseTransition())
|
|||||||
EnumSet.of(ContainerEventType.RESOURCE_FAILED,
|
EnumSet.of(ContainerEventType.RESOURCE_FAILED,
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE))
|
||||||
|
// No transition - assuming container is on its way to completion
|
||||||
|
.addTransition(ContainerState.DONE, ContainerState.DONE,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN)
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
.installTopology();
|
.installTopology();
|
||||||
@ -622,6 +715,7 @@ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
|
|||||||
case RUNNING:
|
case RUNNING:
|
||||||
case RELAUNCHING:
|
case RELAUNCHING:
|
||||||
case REINITIALIZING:
|
case REINITIALIZING:
|
||||||
|
case REINITIALIZING_AWAITING_KILL:
|
||||||
case EXITED_WITH_SUCCESS:
|
case EXITED_WITH_SUCCESS:
|
||||||
case EXITED_WITH_FAILURE:
|
case EXITED_WITH_FAILURE:
|
||||||
case KILLING:
|
case KILLING:
|
||||||
@ -925,6 +1019,45 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class UpdateTransition extends ContainerTransition {
|
||||||
|
@Override
|
||||||
|
public void transition(
|
||||||
|
ContainerImpl container, ContainerEvent event) {
|
||||||
|
UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
|
||||||
|
// Update the container token
|
||||||
|
container.setContainerTokenIdentifier(updateEvent.getUpdatedToken());
|
||||||
|
if (updateEvent.isResourceChange()) {
|
||||||
|
try {
|
||||||
|
// Persist change in the state store.
|
||||||
|
container.context.getNMStateStore().storeContainerResourceChanged(
|
||||||
|
container.containerId,
|
||||||
|
container.getContainerTokenIdentifier().getVersion(),
|
||||||
|
container.getResource());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not store container [" + container.containerId
|
||||||
|
+ "] resource change..", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class NotifyContainerSchedulerOfUpdateTransition extends
|
||||||
|
UpdateTransition {
|
||||||
|
@Override
|
||||||
|
public void transition(
|
||||||
|
ContainerImpl container, ContainerEvent event) {
|
||||||
|
|
||||||
|
UpdateContainerTokenEvent updateEvent = (UpdateContainerTokenEvent)event;
|
||||||
|
// Save original token
|
||||||
|
ContainerTokenIdentifier originalToken =
|
||||||
|
container.containerTokenIdentifier;
|
||||||
|
super.transition(container, updateEvent);
|
||||||
|
container.dispatcher.getEventHandler().handle(
|
||||||
|
new UpdateContainerSchedulerEvent(container,
|
||||||
|
originalToken, updateEvent));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* State transition when a NEW container receives the INIT_CONTAINER
|
* State transition when a NEW container receives the INIT_CONTAINER
|
||||||
* message.
|
* message.
|
||||||
@ -1070,12 +1203,15 @@ public ContainerState transition(ContainerImpl container,
|
|||||||
/**
|
/**
|
||||||
* Transition to start the Re-Initialization process.
|
* Transition to start the Re-Initialization process.
|
||||||
*/
|
*/
|
||||||
static class ReInitializeContainerTransition extends ContainerTransition {
|
static class ReInitializeContainerTransition implements
|
||||||
|
MultipleArcTransition<ContainerImpl, ContainerEvent, ContainerState> {
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
public ContainerState transition(
|
||||||
|
ContainerImpl container, ContainerEvent event) {
|
||||||
container.reInitContext = createReInitContext(container, event);
|
container.reInitContext = createReInitContext(container, event);
|
||||||
|
boolean resourcesPresent = false;
|
||||||
try {
|
try {
|
||||||
// 'reInitContext.newResourceSet' can be
|
// 'reInitContext.newResourceSet' can be
|
||||||
// a) current container resourceSet (In case of Restart)
|
// a) current container resourceSet (In case of Restart)
|
||||||
@ -1097,6 +1233,7 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
|||||||
container.dispatcher.getEventHandler().handle(
|
container.dispatcher.getEventHandler().handle(
|
||||||
new ContainersLauncherEvent(container,
|
new ContainersLauncherEvent(container,
|
||||||
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
|
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
|
||||||
|
resourcesPresent = true;
|
||||||
}
|
}
|
||||||
container.metrics.reInitingContainer();
|
container.metrics.reInitingContainer();
|
||||||
NMAuditLogger.logSuccess(container.user,
|
NMAuditLogger.logSuccess(container.user,
|
||||||
@ -1108,7 +1245,11 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
|||||||
" re-initialization failure..", e);
|
" re-initialization failure..", e);
|
||||||
container.addDiagnostics("Error re-initializing due to" +
|
container.addDiagnostics("Error re-initializing due to" +
|
||||||
"[" + e.getMessage() + "]");
|
"[" + e.getMessage() + "]");
|
||||||
|
return ContainerState.RUNNING;
|
||||||
}
|
}
|
||||||
|
return resourcesPresent ?
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL :
|
||||||
|
ContainerState.REINITIALIZING;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ReInitializationContext createReInitContext(
|
protected ReInitializationContext createReInitContext(
|
||||||
@ -1160,11 +1301,14 @@ protected ReInitializationContext createReInitContext(ContainerImpl
|
|||||||
* If all dependencies are met, then restart Container with new bits.
|
* If all dependencies are met, then restart Container with new bits.
|
||||||
*/
|
*/
|
||||||
static class ResourceLocalizedWhileReInitTransition
|
static class ResourceLocalizedWhileReInitTransition
|
||||||
extends ContainerTransition {
|
implements MultipleArcTransition
|
||||||
|
<ContainerImpl, ContainerEvent, ContainerState> {
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void transition(ContainerImpl container, ContainerEvent event) {
|
public ContainerState transition(
|
||||||
|
ContainerImpl container, ContainerEvent event) {
|
||||||
ContainerResourceLocalizedEvent rsrcEvent =
|
ContainerResourceLocalizedEvent rsrcEvent =
|
||||||
(ContainerResourceLocalizedEvent) event;
|
(ContainerResourceLocalizedEvent) event;
|
||||||
container.reInitContext.newResourceSet.resourceLocalized(
|
container.reInitContext.newResourceSet.resourceLocalized(
|
||||||
@ -1176,7 +1320,9 @@ public void transition(ContainerImpl container, ContainerEvent event) {
|
|||||||
container.dispatcher.getEventHandler().handle(
|
container.dispatcher.getEventHandler().handle(
|
||||||
new ContainersLauncherEvent(container,
|
new ContainersLauncherEvent(container,
|
||||||
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
|
ContainersLauncherEventType.CLEANUP_CONTAINER_FOR_REINIT));
|
||||||
|
return ContainerState.REINITIALIZING_AWAITING_KILL;
|
||||||
}
|
}
|
||||||
|
return ContainerState.REINITIALIZING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,7 +20,8 @@
|
|||||||
|
|
||||||
public enum ContainerState {
|
public enum ContainerState {
|
||||||
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
|
NEW, LOCALIZING, LOCALIZATION_FAILED, SCHEDULED, RUNNING, RELAUNCHING,
|
||||||
REINITIALIZING, EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
|
REINITIALIZING, REINITIALIZING_AWAITING_KILL,
|
||||||
|
EXITED_WITH_SUCCESS, EXITED_WITH_FAILURE, KILLING,
|
||||||
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
|
CONTAINER_CLEANEDUP_AFTER_KILL, CONTAINER_RESOURCES_CLEANINGUP, DONE,
|
||||||
PAUSING, PAUSED, RESUMING
|
PAUSING, PAUSED, RESUMING
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,86 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update Event consumed by the Container.
|
||||||
|
*/
|
||||||
|
public class UpdateContainerTokenEvent extends ContainerEvent {
|
||||||
|
private final ContainerTokenIdentifier updatedToken;
|
||||||
|
private final boolean isResourceChange;
|
||||||
|
private final boolean isExecTypeUpdate;
|
||||||
|
private final boolean isIncrease;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create Update event.
|
||||||
|
*
|
||||||
|
* @param cID Container Id.
|
||||||
|
* @param updatedToken Updated Container Token.
|
||||||
|
* @param isResourceChange Is Resource change.
|
||||||
|
* @param isExecTypeUpdate Is ExecutionType Update.
|
||||||
|
* @param isIncrease Is container increase.
|
||||||
|
*/
|
||||||
|
public UpdateContainerTokenEvent(ContainerId cID,
|
||||||
|
ContainerTokenIdentifier updatedToken, boolean isResourceChange,
|
||||||
|
boolean isExecTypeUpdate, boolean isIncrease) {
|
||||||
|
super(cID, ContainerEventType.UPDATE_CONTAINER_TOKEN);
|
||||||
|
this.updatedToken = updatedToken;
|
||||||
|
this.isResourceChange = isResourceChange;
|
||||||
|
this.isExecTypeUpdate = isExecTypeUpdate;
|
||||||
|
this.isIncrease = isIncrease;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update Container Token.
|
||||||
|
*
|
||||||
|
* @return Container Token.
|
||||||
|
*/
|
||||||
|
public ContainerTokenIdentifier getUpdatedToken() {
|
||||||
|
return updatedToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this update a ResourceChange.
|
||||||
|
*
|
||||||
|
* @return isResourceChange.
|
||||||
|
*/
|
||||||
|
public boolean isResourceChange() {
|
||||||
|
return isResourceChange;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this update an ExecType Update.
|
||||||
|
*
|
||||||
|
* @return isExecTypeUpdate.
|
||||||
|
*/
|
||||||
|
public boolean isExecTypeUpdate() {
|
||||||
|
return isExecTypeUpdate;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this a container Increase.
|
||||||
|
*
|
||||||
|
* @return isIncrease.
|
||||||
|
*/
|
||||||
|
public boolean isIncrease() {
|
||||||
|
return isIncrease;
|
||||||
|
}
|
||||||
|
}
|
@ -33,6 +33,7 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
||||||
.ChangeMonitoringContainerResourceEvent;
|
.ChangeMonitoringContainerResourceEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
@ -151,7 +152,9 @@ public void handle(ContainerSchedulerEvent event) {
|
|||||||
case SCHEDULE_CONTAINER:
|
case SCHEDULE_CONTAINER:
|
||||||
scheduleContainer(event.getContainer());
|
scheduleContainer(event.getContainer());
|
||||||
break;
|
break;
|
||||||
|
// NOTE: Is sent only after container state has changed to PAUSED...
|
||||||
case CONTAINER_PAUSED:
|
case CONTAINER_PAUSED:
|
||||||
|
// NOTE: Is sent only after container state has changed to DONE...
|
||||||
case CONTAINER_COMPLETED:
|
case CONTAINER_COMPLETED:
|
||||||
onResourcesReclaimed(event.getContainer());
|
onResourcesReclaimed(event.getContainer());
|
||||||
break;
|
break;
|
||||||
@ -180,37 +183,17 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
|
|||||||
if (updateEvent.isResourceChange()) {
|
if (updateEvent.isResourceChange()) {
|
||||||
if (runningContainers.containsKey(containerId)) {
|
if (runningContainers.containsKey(containerId)) {
|
||||||
this.utilizationTracker.subtractContainerResource(
|
this.utilizationTracker.subtractContainerResource(
|
||||||
updateEvent.getContainer());
|
new ContainerImpl(getConfig(), null, null, null, null,
|
||||||
updateEvent.getContainer().setContainerTokenIdentifier(
|
updateEvent.getOriginalToken(), context));
|
||||||
updateEvent.getUpdatedToken());
|
|
||||||
this.utilizationTracker.addContainerResources(
|
this.utilizationTracker.addContainerResources(
|
||||||
updateEvent.getContainer());
|
updateEvent.getContainer());
|
||||||
getContainersMonitor().handle(
|
getContainersMonitor().handle(
|
||||||
new ChangeMonitoringContainerResourceEvent(containerId,
|
new ChangeMonitoringContainerResourceEvent(containerId,
|
||||||
updateEvent.getUpdatedToken().getResource()));
|
updateEvent.getUpdatedToken().getResource()));
|
||||||
} else {
|
|
||||||
// Is Queued or localizing..
|
|
||||||
updateEvent.getContainer().setContainerTokenIdentifier(
|
|
||||||
updateEvent.getUpdatedToken());
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
// Persist change in the state store.
|
|
||||||
this.context.getNMStateStore().storeContainerResourceChanged(
|
|
||||||
containerId,
|
|
||||||
updateEvent.getUpdatedToken().getVersion(),
|
|
||||||
updateEvent.getUpdatedToken().getResource());
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("Could not store container [" + containerId + "] resource " +
|
|
||||||
"change..", e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updateEvent.isExecTypeUpdate()) {
|
if (updateEvent.isExecTypeUpdate()) {
|
||||||
updateEvent.getContainer().setContainerTokenIdentifier(
|
|
||||||
updateEvent.getUpdatedToken());
|
|
||||||
// If this is a running container.. just change the execution type
|
|
||||||
// and be done with it.
|
|
||||||
if (!runningContainers.containsKey(containerId)) {
|
|
||||||
// Promotion or not (Increase signifies either a promotion
|
// Promotion or not (Increase signifies either a promotion
|
||||||
// or container size increase)
|
// or container size increase)
|
||||||
if (updateEvent.isIncrease()) {
|
if (updateEvent.isIncrease()) {
|
||||||
@ -218,10 +201,10 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
|
|||||||
if (queuedOpportunisticContainers.remove(containerId) != null) {
|
if (queuedOpportunisticContainers.remove(containerId) != null) {
|
||||||
queuedGuaranteedContainers.put(containerId,
|
queuedGuaranteedContainers.put(containerId,
|
||||||
updateEvent.getContainer());
|
updateEvent.getContainer());
|
||||||
}
|
|
||||||
//Kill/pause opportunistic containers if any to make room for
|
//Kill/pause opportunistic containers if any to make room for
|
||||||
// promotion request
|
// promotion request
|
||||||
reclaimOpportunisticContainerResources(updateEvent.getContainer());
|
reclaimOpportunisticContainerResources(updateEvent.getContainer());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Demotion of queued container.. Should not happen too often
|
// Demotion of queued container.. Should not happen too often
|
||||||
// since you should not find too many queued guaranteed
|
// since you should not find too many queued guaranteed
|
||||||
@ -231,7 +214,7 @@ private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
|
|||||||
updateEvent.getContainer());
|
updateEvent.getContainer());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
startPendingContainers(maxOppQueueLength <= 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,6 +273,16 @@ private void onResourcesReclaimed(Container container) {
|
|||||||
queuedGuaranteedContainers.remove(container.getContainerId());
|
queuedGuaranteedContainers.remove(container.getContainerId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Requeue PAUSED containers
|
||||||
|
if (container.getContainerState() == ContainerState.PAUSED) {
|
||||||
|
if (container.getContainerTokenIdentifier().getExecutionType() ==
|
||||||
|
ExecutionType.GUARANTEED) {
|
||||||
|
queuedGuaranteedContainers.put(container.getContainerId(), container);
|
||||||
|
} else {
|
||||||
|
queuedOpportunisticContainers.put(
|
||||||
|
container.getContainerId(), container);
|
||||||
|
}
|
||||||
|
}
|
||||||
// decrement only if it was a running container
|
// decrement only if it was a running container
|
||||||
Container completedContainer = runningContainers.remove(container
|
Container completedContainer = runningContainers.remove(container
|
||||||
.getContainerId());
|
.getContainerId());
|
||||||
@ -301,7 +294,8 @@ private void onResourcesReclaimed(Container container) {
|
|||||||
ExecutionType.OPPORTUNISTIC) {
|
ExecutionType.OPPORTUNISTIC) {
|
||||||
this.metrics.completeOpportunisticContainer(container.getResource());
|
this.metrics.completeOpportunisticContainer(container.getResource());
|
||||||
}
|
}
|
||||||
startPendingContainers(false);
|
boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
|
||||||
|
startPendingContainers(forceStartGuaranteedContainers);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -311,26 +305,9 @@ private void onResourcesReclaimed(Container container) {
|
|||||||
* container without looking at available resource
|
* container without looking at available resource
|
||||||
*/
|
*/
|
||||||
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
|
private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
|
||||||
// Start pending guaranteed containers, if resources available.
|
// Start guaranteed containers that are paused, if resources available.
|
||||||
boolean resourcesAvailable = startContainers(
|
boolean resourcesAvailable = startContainers(
|
||||||
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
|
queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
|
||||||
// Resume opportunistic containers, if resource available.
|
|
||||||
if (resourcesAvailable) {
|
|
||||||
List<Container> pausedContainers = new ArrayList<Container>();
|
|
||||||
Map<ContainerId, Container> containers =
|
|
||||||
context.getContainers();
|
|
||||||
for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
|
|
||||||
ContainerId contId = entry.getKey();
|
|
||||||
// Find containers that were not already started and are in paused state
|
|
||||||
if(false == runningContainers.containsKey(contId)) {
|
|
||||||
if(containers.get(contId).getContainerState()
|
|
||||||
== ContainerState.PAUSED) {
|
|
||||||
pausedContainers.add(containers.get(contId));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
resourcesAvailable = startContainers(pausedContainers, false);
|
|
||||||
}
|
|
||||||
// Start opportunistic containers, if resources available.
|
// Start opportunistic containers, if resources available.
|
||||||
if (resourcesAvailable) {
|
if (resourcesAvailable) {
|
||||||
startContainers(queuedOpportunisticContainers.values(), false);
|
startContainers(queuedOpportunisticContainers.values(), false);
|
||||||
@ -590,6 +567,8 @@ private void shedQueuedOpportunisticContainers() {
|
|||||||
queuedOpportunisticContainers.values().iterator();
|
queuedOpportunisticContainers.values().iterator();
|
||||||
while (containerIter.hasNext()) {
|
while (containerIter.hasNext()) {
|
||||||
Container container = containerIter.next();
|
Container container = containerIter.next();
|
||||||
|
// Do not shed PAUSED containers
|
||||||
|
if (container.getContainerState() != ContainerState.PAUSED) {
|
||||||
if (numAllowed <= 0) {
|
if (numAllowed <= 0) {
|
||||||
container.sendKillEvent(
|
container.sendKillEvent(
|
||||||
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
||||||
@ -602,6 +581,7 @@ private void shedQueuedOpportunisticContainers() {
|
|||||||
numAllowed--;
|
numAllowed--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public ContainersMonitor getContainersMonitor() {
|
public ContainersMonitor getContainersMonitor() {
|
||||||
return this.context.getContainerManager().getContainersMonitor();
|
return this.context.getContainerManager().getContainersMonitor();
|
||||||
|
@ -21,33 +21,37 @@
|
|||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
|
||||||
.Container;
|
.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update Event consumed by the {@link ContainerScheduler}.
|
* Update Event consumed by the {@link ContainerScheduler}.
|
||||||
*/
|
*/
|
||||||
public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
|
public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
|
||||||
|
|
||||||
private ContainerTokenIdentifier updatedToken;
|
private final UpdateContainerTokenEvent containerEvent;
|
||||||
private boolean isResourceChange;
|
private final ContainerTokenIdentifier originalToken;
|
||||||
private boolean isExecTypeUpdate;
|
|
||||||
private boolean isIncrease;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create instance of Event.
|
* Create instance of Event.
|
||||||
*
|
*
|
||||||
* @param originalContainer Original Container.
|
* @param container Container.
|
||||||
* @param updatedToken Updated Container Token.
|
* @param origToken The Original Container Token.
|
||||||
* @param isResourceChange is this a Resource Change.
|
* @param event The Container Event.
|
||||||
* @param isExecTypeUpdate is this an ExecTypeUpdate.
|
|
||||||
* @param isIncrease is this a Container Increase.
|
|
||||||
*/
|
*/
|
||||||
public UpdateContainerSchedulerEvent(Container originalContainer,
|
public UpdateContainerSchedulerEvent(Container container,
|
||||||
ContainerTokenIdentifier updatedToken, boolean isResourceChange,
|
ContainerTokenIdentifier origToken, UpdateContainerTokenEvent event) {
|
||||||
boolean isExecTypeUpdate, boolean isIncrease) {
|
super(container, ContainerSchedulerEventType.UPDATE_CONTAINER);
|
||||||
super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
|
this.containerEvent = event;
|
||||||
this.updatedToken = updatedToken;
|
this.originalToken = origToken;
|
||||||
this.isResourceChange = isResourceChange;
|
}
|
||||||
this.isExecTypeUpdate = isExecTypeUpdate;
|
|
||||||
this.isIncrease = isIncrease;
|
/**
|
||||||
|
* Original Token before update.
|
||||||
|
*
|
||||||
|
* @return Container Token.
|
||||||
|
*/
|
||||||
|
public ContainerTokenIdentifier getOriginalToken() {
|
||||||
|
return this.originalToken;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -56,7 +60,7 @@ public UpdateContainerSchedulerEvent(Container originalContainer,
|
|||||||
* @return Container Token.
|
* @return Container Token.
|
||||||
*/
|
*/
|
||||||
public ContainerTokenIdentifier getUpdatedToken() {
|
public ContainerTokenIdentifier getUpdatedToken() {
|
||||||
return updatedToken;
|
return containerEvent.getUpdatedToken();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -64,7 +68,7 @@ public ContainerTokenIdentifier getUpdatedToken() {
|
|||||||
* @return isResourceChange.
|
* @return isResourceChange.
|
||||||
*/
|
*/
|
||||||
public boolean isResourceChange() {
|
public boolean isResourceChange() {
|
||||||
return isResourceChange;
|
return containerEvent.isResourceChange();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -72,7 +76,7 @@ public boolean isResourceChange() {
|
|||||||
* @return isExecTypeUpdate.
|
* @return isExecTypeUpdate.
|
||||||
*/
|
*/
|
||||||
public boolean isExecTypeUpdate() {
|
public boolean isExecTypeUpdate() {
|
||||||
return isExecTypeUpdate;
|
return containerEvent.isExecTypeUpdate();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -80,6 +84,6 @@ public boolean isExecTypeUpdate() {
|
|||||||
* @return isIncrease.
|
* @return isIncrease.
|
||||||
*/
|
*/
|
||||||
public boolean isIncrease() {
|
public boolean isIncrease() {
|
||||||
return isIncrease;
|
return containerEvent.isIncrease();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -208,6 +208,8 @@ public void setup() throws IOException {
|
|||||||
containerManager.init(conf);
|
containerManager.init(conf);
|
||||||
nodeStatusUpdater.start();
|
nodeStatusUpdater.start();
|
||||||
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
((NMContext)context).setNodeStatusUpdater(nodeStatusUpdater);
|
||||||
|
((NMContext)context).setContainerStateTransitionListener(
|
||||||
|
new NodeManager.DefaultContainerStateListener());
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ContainerManagerImpl
|
protected ContainerManagerImpl
|
||||||
|
@ -90,12 +90,16 @@
|
|||||||
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestAuxServices.ServiceA;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
@ -119,6 +123,41 @@ public TestContainerManager() throws UnsupportedFileSystemException {
|
|||||||
LOG = LoggerFactory.getLogger(TestContainerManager.class);
|
LOG = LoggerFactory.getLogger(TestContainerManager.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class Listener implements ContainerStateTransitionListener {
|
||||||
|
|
||||||
|
private final Map<ContainerId,
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.
|
||||||
|
container.ContainerState>> states = new HashMap<>();
|
||||||
|
private final Map<ContainerId, List<ContainerEventType>> events =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(ContainerImpl op,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState beforeState,
|
||||||
|
ContainerEvent eventToBeProcessed) {
|
||||||
|
if (!states.containsKey(op.getContainerId())) {
|
||||||
|
states.put(op.getContainerId(), new ArrayList<>());
|
||||||
|
states.get(op.getContainerId()).add(beforeState);
|
||||||
|
events.put(op.getContainerId(), new ArrayList<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(ContainerImpl op,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState beforeState,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState afterState,
|
||||||
|
ContainerEvent processedEvent) {
|
||||||
|
states.get(op.getContainerId()).add(afterState);
|
||||||
|
events.get(op.getContainerId()).add(processedEvent.getType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean delayContainers = false;
|
private boolean delayContainers = false;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -496,6 +535,9 @@ private String[] testContainerReInitSuccess(boolean autoCommit)
|
|||||||
@Test
|
@Test
|
||||||
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
|
public void testContainerUpgradeSuccessAutoCommit() throws IOException,
|
||||||
InterruptedException, YarnException {
|
InterruptedException, YarnException {
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.context.
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
testContainerReInitSuccess(true);
|
testContainerReInitSuccess(true);
|
||||||
// Should not be able to Commit (since already auto committed)
|
// Should not be able to Commit (since already auto committed)
|
||||||
try {
|
try {
|
||||||
@ -504,6 +546,41 @@ public void testContainerUpgradeSuccessAutoCommit() throws IOException,
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
|
Assert.assertTrue(e.getMessage().contains("Nothing to Commit"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.LOCALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING), containerStates);
|
||||||
|
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -524,6 +601,9 @@ public void testContainerUpgradeSuccessExplicitCommit() throws IOException,
|
|||||||
@Test
|
@Test
|
||||||
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
|
public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
|
||||||
InterruptedException, YarnException {
|
InterruptedException, YarnException {
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.context.
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
String[] pids = testContainerReInitSuccess(false);
|
String[] pids = testContainerReInitSuccess(false);
|
||||||
|
|
||||||
// Test that the container can be Restarted after the successful upgrrade.
|
// Test that the container can be Restarted after the successful upgrrade.
|
||||||
@ -575,6 +655,67 @@ public void testContainerUpgradeSuccessExplicitRollback() throws IOException,
|
|||||||
|
|
||||||
Assert.assertNotEquals("The Rolled-back process should be a different pid",
|
Assert.assertNotEquals("The Rolled-back process should be a different pid",
|
||||||
pids[0], rolledBackPid);
|
pids[0], rolledBackPid);
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.LOCALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
// This is the successful restart
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
// This is the rollback
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING), containerStates);
|
||||||
|
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.ROLLBACK_REINIT,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -584,6 +725,9 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.context.
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
// ////// Construct the Container-id
|
// ////// Construct the Container-id
|
||||||
ContainerId cId = createContainerId(0);
|
ContainerId cId = createContainerId(0);
|
||||||
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
|
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
|
||||||
@ -598,6 +742,32 @@ public void testContainerUpgradeLocalizationFailure() throws IOException,
|
|||||||
// since upgrade was terminated..
|
// since upgrade was terminated..
|
||||||
Assert.assertTrue("Process is NOT alive!",
|
Assert.assertTrue("Process is NOT alive!",
|
||||||
DefaultContainerExecutor.containerIsAlive(pid));
|
DefaultContainerExecutor.containerIsAlive(pid));
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.LOCALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING), containerStates);
|
||||||
|
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_FAILED), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -632,6 +802,9 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.context.
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
// ////// Construct the Container-id
|
// ////// Construct the Container-id
|
||||||
ContainerId cId = createContainerId(0);
|
ContainerId cId = createContainerId(0);
|
||||||
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
|
File oldStartFile = new File(tmpDir, "start_file_o.txt").getAbsoluteFile();
|
||||||
@ -666,6 +839,50 @@ public void testContainerUpgradeRollbackDueToFailure() throws IOException,
|
|||||||
|
|
||||||
Assert.assertNotEquals("The Rolled-back process should be a different pid",
|
Assert.assertNotEquals("The Rolled-back process should be a different pid",
|
||||||
pid, rolledBackPid);
|
pid, rolledBackPid);
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.LOCALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.REINITIALIZING_AWAITING_KILL,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING), containerStates);
|
||||||
|
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.REINITIALIZE_CONTAINER,
|
||||||
|
ContainerEventType.RESOURCE_LOCALIZED,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_KILLED_ON_REQUEST,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.UPDATE_DIAGNOSTICS_MSG,
|
||||||
|
ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1582,16 +1799,12 @@ public void testIncreaseContainerResourceWithInvalidRequests() throws Exception
|
|||||||
containerManager.updateContainer(updateRequest);
|
containerManager.updateContainer(updateRequest);
|
||||||
// Check response
|
// Check response
|
||||||
Assert.assertEquals(
|
Assert.assertEquals(
|
||||||
0, updateResponse.getSuccessfullyUpdatedContainers().size());
|
1, updateResponse.getSuccessfullyUpdatedContainers().size());
|
||||||
Assert.assertEquals(2, updateResponse.getFailedRequests().size());
|
Assert.assertEquals(1, updateResponse.getFailedRequests().size());
|
||||||
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
|
for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
|
||||||
.getFailedRequests().entrySet()) {
|
.getFailedRequests().entrySet()) {
|
||||||
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
|
Assert.assertNotNull("Failed message", entry.getValue().getMessage());
|
||||||
if (cId0.equals(entry.getKey())) {
|
if (cId7.equals(entry.getKey())) {
|
||||||
Assert.assertTrue(entry.getValue().getMessage()
|
|
||||||
.contains("Resource can only be changed when a "
|
|
||||||
+ "container is in RUNNING or SCHEDULED state"));
|
|
||||||
} else if (cId7.equals(entry.getKey())) {
|
|
||||||
Assert.assertTrue(entry.getValue().getMessage()
|
Assert.assertTrue(entry.getValue().getMessage()
|
||||||
.contains("Container " + cId7.toString()
|
.contains("Container " + cId7.toString()
|
||||||
+ " is not handled by this NodeManager"));
|
+ " is not handled by this NodeManager"));
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
@ -47,11 +48,17 @@
|
|||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||||
@ -76,6 +83,40 @@ public TestContainerSchedulerQueuing() throws UnsupportedFileSystemException {
|
|||||||
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
|
LOG = LoggerFactory.getLogger(TestContainerSchedulerQueuing.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class Listener implements ContainerStateTransitionListener {
|
||||||
|
|
||||||
|
private final Map<ContainerId,
|
||||||
|
List<ContainerState>> states = new HashMap<>();
|
||||||
|
private final Map<ContainerId, List<ContainerEventType>> events =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(Context context) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preTransition(ContainerImpl op,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState beforeState,
|
||||||
|
ContainerEvent eventToBeProcessed) {
|
||||||
|
if (!states.containsKey(op.getContainerId())) {
|
||||||
|
states.put(op.getContainerId(), new ArrayList<>());
|
||||||
|
states.get(op.getContainerId()).add(beforeState);
|
||||||
|
events.put(op.getContainerId(), new ArrayList<>());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postTransition(ContainerImpl op,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState beforeState,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState afterState,
|
||||||
|
ContainerEvent processedEvent) {
|
||||||
|
states.get(op.getContainerId()).add(afterState);
|
||||||
|
events.get(op.getContainerId()).add(processedEvent.getType());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private boolean delayContainers = true;
|
private boolean delayContainers = true;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -542,6 +583,10 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
|
|||||||
containerManager.start();
|
containerManager.start();
|
||||||
containerManager.getContainerScheduler().
|
containerManager.getContainerScheduler().
|
||||||
setUsePauseEventForPreemption(true);
|
setUsePauseEventForPreemption(true);
|
||||||
|
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.getContext().
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
@ -606,6 +651,39 @@ public void testPauseOpportunisticForGuaranteedContainer() throws Exception {
|
|||||||
// starts running
|
// starts running
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
createContainerId(0), ContainerState.DONE, 40);
|
createContainerId(0), ContainerState.DONE, 40);
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.PAUSING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.PAUSED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RESUMING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.EXITED_WITH_SUCCESS,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.DONE), containerStates);
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(0));
|
||||||
|
Assert.assertEquals(Arrays.asList(ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED,
|
||||||
|
ContainerEventType.PAUSE_CONTAINER,
|
||||||
|
ContainerEventType.CONTAINER_PAUSED,
|
||||||
|
ContainerEventType.RESUME_CONTAINER,
|
||||||
|
ContainerEventType.CONTAINER_RESUMED,
|
||||||
|
ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
|
||||||
|
ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1068,6 +1146,9 @@ public void testStopQueuedContainer() throws Exception {
|
|||||||
@Test
|
@Test
|
||||||
public void testPromotionOfOpportunisticContainers() throws Exception {
|
public void testPromotionOfOpportunisticContainers() throws Exception {
|
||||||
containerManager.start();
|
containerManager.start();
|
||||||
|
Listener listener = new Listener();
|
||||||
|
((NodeManager.DefaultContainerStateListener)containerManager.getContext().
|
||||||
|
getContainerStateTransitionListener()).addListener(listener);
|
||||||
|
|
||||||
ContainerLaunchContext containerLaunchContext =
|
ContainerLaunchContext containerLaunchContext =
|
||||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
@ -1150,6 +1231,7 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
|
|||||||
containerStatuses = containerManager
|
containerStatuses = containerManager
|
||||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
Assert.assertEquals(1, containerStatuses.size());
|
Assert.assertEquals(1, containerStatuses.size());
|
||||||
|
|
||||||
for (ContainerStatus status : containerStatuses) {
|
for (ContainerStatus status : containerStatuses) {
|
||||||
if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING ==
|
if (org.apache.hadoop.yarn.api.records.ContainerState.RUNNING ==
|
||||||
status.getState()) {
|
status.getState()) {
|
||||||
@ -1160,6 +1242,25 @@ public void testPromotionOfOpportunisticContainers() throws Exception {
|
|||||||
|
|
||||||
// Ensure no containers are queued.
|
// Ensure no containers are queued.
|
||||||
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
|
Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
|
||||||
|
|
||||||
|
List<org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState> containerStates =
|
||||||
|
listener.states.get(createContainerId(1));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.NEW,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.SCHEDULED,
|
||||||
|
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.
|
||||||
|
ContainerState.RUNNING), containerStates);
|
||||||
|
List<ContainerEventType> containerEventTypes =
|
||||||
|
listener.events.get(createContainerId(1));
|
||||||
|
Assert.assertEquals(Arrays.asList(
|
||||||
|
ContainerEventType.INIT_CONTAINER,
|
||||||
|
ContainerEventType.UPDATE_CONTAINER_TOKEN,
|
||||||
|
ContainerEventType.CONTAINER_LAUNCHED), containerEventTypes);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
x
Reference in New Issue
Block a user