From bd348d20b2fec8dfaeadb5a8e2f6c1f3afdceb82 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Wed, 24 Aug 2016 10:23:06 -0700 Subject: [PATCH] Revert "YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)" This reverts commit 307cda70dbde6a8e377794b395a0d958603df5d7. --- .../ContainerManagerImpl.java | 26 +++++-------- .../queuing/QueuingContainerManagerImpl.java | 38 ------------------- .../recovery/NMLeveldbStateStoreService.java | 26 +------------ .../recovery/NMNullStateStoreService.java | 4 -- .../recovery/NMStateStoreService.java | 9 ----- .../recovery/NMMemoryStateStoreService.java | 6 --- .../TestNMLeveldbStateStoreService.java | 12 ------ 7 files changed, 11 insertions(+), 110 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 1e3b854969a..94700f35e75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -337,6 +337,7 @@ public class ContainerManagerImpl extends CompositeService implements app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); } + @SuppressWarnings("unchecked") private void recoverContainer(RecoveredContainerState rcs) throws IOException { StartContainerRequest req = rcs.getStartRequest(); @@ -351,7 +352,14 @@ public class ContainerManagerImpl extends CompositeService implements + " with exit code " + rcs.getExitCode()); if (context.getApplications().containsKey(appId)) { - recoverActiveContainer(launchContext, token, rcs); + Credentials credentials = + YarnServerSecurityUtils.parseCredentials(launchContext); + Container container = new ContainerImpl(getConfig(), dispatcher, + req.getContainerLaunchContext(), + credentials, metrics, token, context, rcs); + context.getContainers().put(containerId, container); + dispatcher.getEventHandler().handle( + new ApplicationContainerInitEvent(container)); } else { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { LOG.warn(containerId + " has no corresponding application!"); @@ -361,22 +369,6 @@ public class ContainerManagerImpl extends CompositeService implements } } - /** - * Recover a running container. - */ - @SuppressWarnings("unchecked") - protected void recoverActiveContainer( - ContainerLaunchContext launchContext, ContainerTokenIdentifier token, - RecoveredContainerState rcs) throws IOException { - Credentials credentials = YarnServerSecurityUtils.parseCredentials( - launchContext); - Container container = new ContainerImpl(getConfig(), dispatcher, - launchContext, credentials, metrics, token, context, rcs); - context.getContainers().put(token.getContainerID(), container); - dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent( - container)); - } - private void waitForRecoveredContainers() throws InterruptedException { final int sleepMsec = 100; int waitIterations = 100; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 38b1b07287b..e8f14f1feb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,8 +57,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState; -import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,8 +125,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { startAllocatedContainer(allocatedContInfo); } else { ContainerId cIdToStart = containerTokenIdentifier.getContainerID(); - this.context.getNMStateStore().storeContainer(cIdToStart, request); - this.context.getNMStateStore().storeContainerQueued(cIdToStart); LOG.info("No available resources for container {} to start its execution " + "immediately.", cIdToStart); if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { @@ -164,7 +159,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { this.context.getQueuingContext().getKilledQueuedContainers().put( containerTokenId, "Queued container request removed by ApplicationMaster."); - this.context.getNMStateStore().storeContainerKilled(containerID); } else { // The container started execution in the meanwhile. try { @@ -474,38 +468,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { return super.getContainerStatusInternal(containerID, nmTokenIdentifier); } - /** - * Recover running or queued container. - */ - @Override - protected void recoverActiveContainer( - ContainerLaunchContext launchContext, ContainerTokenIdentifier token, - RecoveredContainerState rcs) throws IOException { - if (rcs.getStatus() == - RecoveredContainerStatus.QUEUED && !rcs.getKilled()) { - LOG.info(token.getContainerID() - + "will be added to the queued containers."); - - AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo( - token, rcs.getStartRequest(), token.getExecutionType(), - token.getResource(), getConfig()); - - this.context.getQueuingContext().getQueuedContainers().put( - token.getContainerID(), token); - - if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { - queuedGuaranteedContainers.add(allocatedContInfo); - // Kill running opportunistic containers to make space for - // guaranteed container. - killOpportunisticContainers(allocatedContInfo); - } else { - queuedOpportunisticContainers.add(allocatedContInfo); - } - } else { - super.recoverActiveContainer(launchContext, token, rcs); - } - } - @VisibleForTesting public int getNumAllocatedGuaranteedContainers() { return allocatedGuaranteedContainers.size(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index 5fe27134954..e418bf98413 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -80,7 +80,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; private static final Version CURRENT_VERSION_INFO = Version - .newInstance(2, 0); + .newInstance(1, 0); private static final String DELETION_TASK_KEY_PREFIX = "DeletionService/deltask_"; @@ -106,7 +106,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; - private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued"; private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; @@ -240,13 +239,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { StartContainerRequestProto.parseFrom(entry.getValue())); } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { rcs.diagnostics = asString(entry.getValue()); - } else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) { - if (rcs.status == RecoveredContainerStatus.REQUESTED) { - rcs.status = RecoveredContainerStatus.QUEUED; - } } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { - if ((rcs.status == RecoveredContainerStatus.REQUESTED) - || (rcs.status == RecoveredContainerStatus.QUEUED)) { + if (rcs.status == RecoveredContainerStatus.REQUESTED) { rcs.status = RecoveredContainerStatus.LAUNCHED; } } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { @@ -289,21 +283,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { } } - @Override - public void storeContainerQueued(ContainerId containerId) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("storeContainerQueued: containerId=" + containerId); - } - - String key = CONTAINERS_KEY_PREFIX + containerId.toString() - + CONTAINER_QUEUED_KEY_SUFFIX; - try { - db.put(bytes(key), EMPTY_VALUE); - } catch (DBException e) { - throw new IOException(e); - } - } - @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { @@ -438,7 +417,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); - batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); db.write(batch); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index 112095e1be7..08b80e961a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -74,10 +74,6 @@ public class NMNullStateStoreService extends NMStateStoreService { StartContainerRequest startRequest) throws IOException { } - @Override - public void storeContainerQueued(ContainerId containerId) throws IOException { - } - @Override public void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index 57f35a47b40..ccf1e709d99 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -62,7 +62,6 @@ public abstract class NMStateStoreService extends AbstractService { public enum RecoveredContainerStatus { REQUESTED, - QUEUED, LAUNCHED, COMPLETED } @@ -312,14 +311,6 @@ public abstract class NMStateStoreService extends AbstractService { public abstract void storeContainer(ContainerId containerId, StartContainerRequest startRequest) throws IOException; - /** - * Record that a container has been queued at the NM - * @param containerId the container ID - * @throws IOException - */ - public abstract void storeContainerQueued(ContainerId containerId) - throws IOException; - /** * Record that a container has been launched * @param containerId the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index 3c5edc06797..46522453ff2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -131,12 +131,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService { containerStates.put(containerId, rcs); } - @Override - public void storeContainerQueued(ContainerId containerId) throws IOException { - RecoveredContainerState rcs = getRecoveredContainerState(containerId); - rcs.status = RecoveredContainerStatus.QUEUED; - } - @Override public synchronized void storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index d254e4be197..524597799c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -280,18 +280,6 @@ public class TestNMLeveldbStateStoreService { // check whether the new container record is discarded assertEquals(1, recoveredContainers.size()); - // queue the container, and verify recovered - stateStore.storeContainerQueued(containerId); - restartStateStore(); - recoveredContainers = stateStore.loadContainersState(); - assertEquals(1, recoveredContainers.size()); - rcs = recoveredContainers.get(0); - assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus()); - assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); - assertEquals(false, rcs.getKilled()); - assertEquals(containerReq, rcs.getStartRequest()); - assertTrue(rcs.getDiagnostics().isEmpty()); - // launch the container, add some diagnostics, and verify recovered StringBuilder diags = new StringBuilder(); stateStore.storeContainerLaunched(containerId);