From c57eac5dfe277845ab4522a1188023a73ee41539 Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 20 Aug 2015 21:18:23 -0700 Subject: [PATCH] YARN-3868. Recovery support for container resizing. Contributed by Meng Ding --- hadoop-yarn-project/CHANGES.txt | 2 + .../ContainerManagerImpl.java | 5 +- .../container/ContainerImpl.java | 8 +- .../recovery/NMLeveldbStateStoreService.java | 22 ++ .../recovery/NMNullStateStoreService.java | 6 + .../recovery/NMStateStoreService.java | 15 ++ .../TestContainerManagerRecovery.java | 233 +++++++++++++++++- .../recovery/NMMemoryStateStoreService.java | 11 +- .../TestNMLeveldbStateStoreService.java | 11 + 9 files changed, 301 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1872b1a44c2..d2aafa0f05d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -218,6 +218,8 @@ Release 2.8.0 - UNRELEASED YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to support container resizing. (Meng Ding via jianhe) + YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 868d8d3489f..39d2983fbc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -346,7 +346,7 @@ private void recoverContainer(RecoveredContainerState rcs) Container container = new ContainerImpl(getConfig(), dispatcher, context.getNMStateStore(), req.getContainerLaunchContext(), credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(), - rcs.getDiagnostics(), rcs.getKilled()); + rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability()); context.getContainers().put(containerId, container); dispatcher.getEventHandler().handle( new ApplicationContainerInitEvent(container)); @@ -1101,6 +1101,9 @@ private void changeContainerResourceInternal( this.readLock.lock(); try { if (!serviceStopped) { + // Persist container resource change for recovery + this.context.getNMStateStore().storeContainerResourceChanged( + containerId, targetResource); getContainersMonitor().handle( new ChangeMonitoringContainerResourceEvent( containerId, targetResource)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 5c61a9295c3..eff2188c933 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -154,13 +154,19 @@ public ContainerImpl(Configuration conf, Dispatcher dispatcher, Credentials creds, NodeManagerMetrics metrics, ContainerTokenIdentifier containerTokenIdentifier, RecoveredContainerStatus recoveredStatus, int exitCode, - String diagnostics, boolean wasKilled) { + String diagnostics, boolean wasKilled, Resource recoveredCapability) { this(conf, dispatcher, stateStore, launchContext, creds, metrics, containerTokenIdentifier); this.recoveredStatus = recoveredStatus; this.exitCode = exitCode; this.recoveredAsKilled = wasKilled; this.diagnostics.append(diagnostics); + if (recoveredCapability != null + && !this.resource.equals(recoveredCapability)) { + // resource capability had been updated before NM was down + this.resource = Resource.newInstance(recoveredCapability.getMemory(), + recoveredCapability.getVirtualCores()); + } } private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java index df5818222fe..89c71bb8907 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMLeveldbStateStoreService.java @@ -40,7 +40,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -99,6 +102,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService { private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; + private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = + "/resourceChanged"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode"; @@ -230,6 +235,9 @@ private RecoveredContainerState loadContainerState(ContainerId containerId, } else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) { rcs.status = RecoveredContainerStatus.COMPLETED; rcs.exitCode = Integer.parseInt(asString(entry.getValue())); + } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) { + rcs.capability = new ResourcePBImpl( + ResourceProto.parseFrom(entry.getValue())); } else { throw new IOException("Unexpected container state key: " + key); } @@ -274,6 +282,20 @@ public void storeContainerLaunched(ContainerId containerId) } } + @Override + public void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException { + String key = CONTAINERS_KEY_PREFIX + containerId.toString() + + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX; + try { + // New value will overwrite old values for the same key + db.put(bytes(key), + ((ResourcePBImpl) capability).getProto().toByteArray()); + } catch (DBException e) { + throw new IOException(e); + } + } + @Override public void storeContainerKilled(ContainerId containerId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java index ab49543c403..d5dce9bb2ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMNullStateStoreService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -87,6 +88,11 @@ public void storeContainerLaunched(ContainerId containerId) throws IOException { } + @Override + public void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException { + } + @Override public void storeContainerKilled(ContainerId containerId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java index fa663495bc9..e8ccf541cf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMStateStoreService.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -74,6 +75,7 @@ public static class RecoveredContainerState { boolean killed = false; String diagnostics = ""; StartContainerRequest startRequest; + Resource capability; public RecoveredContainerStatus getStatus() { return status; @@ -94,6 +96,10 @@ public String getDiagnostics() { public StartContainerRequest getStartRequest() { return startRequest; } + + public Resource getCapability() { + return capability; + } } public static class LocalResourceTrackerState { @@ -283,6 +289,15 @@ public abstract void storeContainer(ContainerId containerId, public abstract void storeContainerLaunched(ContainerId containerId) throws IOException; + /** + * Record that a container resource has been changed + * @param containerId the container ID + * @param capability the container resource capability + * @throws IOException + */ + public abstract void storeContainerResourceChanged(ContainerId containerId, + Resource capability) throws IOException; + /** * Record that a container has completed * @param containerId the container ID diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java index 4d0aacd14e1..43f1b29c831 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java @@ -28,18 +28,30 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -48,9 +60,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.LogAggregationContext; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; @@ -58,6 +78,9 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; +import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; @@ -65,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher; import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService; @@ -77,18 +101,50 @@ import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.junit.Before; import org.junit.Test; -public class TestContainerManagerRecovery { +public class TestContainerManagerRecovery extends BaseContainerManagerTest { - private NodeManagerMetrics metrics = NodeManagerMetrics.create(); + public TestContainerManagerRecovery() throws UnsupportedFileSystemException { + super(); + } + + @Override + @Before + public void setup() throws IOException { + localFS.delete(new Path(localDir.getAbsolutePath()), true); + localFS.delete(new Path(tmpDir.getAbsolutePath()), true); + localFS.delete(new Path(localLogDir.getAbsolutePath()), true); + localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true); + localDir.mkdir(); + tmpDir.mkdir(); + localLogDir.mkdir(); + remoteLogDir.mkdir(); + LOG.info("Created localDir in " + localDir.getAbsolutePath()); + LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath()); + + String bindAddress = "0.0.0.0:12345"; + conf.set(YarnConfiguration.NM_ADDRESS, bindAddress); + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath()); + conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1); + // Default delSrvc + delSrvc = createDeletionService(); + delSrvc.init(conf); + exec = createContainerExecutor(); + dirsHandler = new LocalDirsHandlerService(); + nodeHealthChecker = new NodeHealthCheckerService( + NodeManager.getNodeHealthScriptRunner(conf), dirsHandler); + nodeHealthChecker.init(conf); + } @Test public void testApplicationRecovery() throws Exception { - YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); - conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user"); NMStateStoreService stateStore = new NMMemoryStateStoreService(); @@ -233,6 +289,91 @@ public void testApplicationRecovery() throws Exception { cm.stop(); } + @Test + public void testContainerResizeRecovery() throws Exception { + conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true); + conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true); + NMStateStoreService stateStore = new NMMemoryStateStoreService(); + stateStore.init(conf); + stateStore.start(); + Context context = createContext(conf, stateStore); + ContainerManagerImpl cm = createContainerManager(context, delSrvc); + cm.init(conf); + cm.start(); + // add an application by starting a container + ApplicationId appId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, 1); + ContainerId cid = ContainerId.newContainerId(attemptId, 1); + Map containerEnv = Collections.emptyMap(); + Map serviceData = Collections.emptyMap(); + Credentials containerCreds = new Credentials(); + DataOutputBuffer dob = new DataOutputBuffer(); + containerCreds.writeTokenStorageToStream(dob); + ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0, + dob.getLength()); + Map acls = Collections.emptyMap(); + File tmpDir = new File("target", + this.getClass().getSimpleName() + "-tmpDir"); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + FileContext localFS = FileContext.getLocalFSFileContext(); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = new HashMap<>(); + localResources.put(destinationFile, rsrc_alpha); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + ContainerLaunchContext clc = ContainerLaunchContext.newInstance( + localResources, containerEnv, commands, serviceData, + containerTokens, acls); + StartContainersResponse startResponse = startContainer( + context, cm, cid, clc, null); + assertTrue(startResponse.getFailedRequests().isEmpty()); + assertEquals(1, context.getApplications().size()); + Application app = context.getApplications().get(appId); + assertNotNull(app); + // make sure the container reaches RUNNING state + waitForNMContainerState(cm, cid, + org.apache.hadoop.yarn.server.nodemanager + .containermanager.container.ContainerState.RUNNING); + Resource targetResource = Resource.newInstance(2048, 2); + IncreaseContainersResourceResponse increaseResponse = + increaseContainersResource(context, cm, cid, targetResource); + assertTrue(increaseResponse.getFailedRequests().isEmpty()); + // check status + ContainerStatus containerStatus = getContainerStatus(context, cm, cid); + assertEquals(targetResource, containerStatus.getCapability()); + // restart and verify container is running and recovered + // to the correct size + cm.stop(); + context = createContext(conf, stateStore); + cm = createContainerManager(context); + cm.init(conf); + cm.start(); + assertEquals(1, context.getApplications().size()); + app = context.getApplications().get(appId); + assertNotNull(app); + containerStatus = getContainerStatus(context, cm, cid); + assertEquals(targetResource, containerStatus.getCapability()); + } + @Test public void testContainerCleanupOnShutdown() throws Exception { ApplicationId appId = ApplicationId.newInstance(0, 1); @@ -257,10 +398,8 @@ public void testContainerCleanupOnShutdown() throws Exception { LogAggregationContext.newInstance("includePattern", "excludePattern"); // verify containers are stopped on shutdown without recovery - YarnConfiguration conf = new YarnConfiguration(); conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false); conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false); - conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234"); Context context = createContext(conf, new NMNullStateStoreService()); ContainerManagerImpl cm = spy(createContainerManager(context)); cm.init(conf); @@ -306,12 +445,36 @@ public void testContainerCleanupOnShutdown() throws Exception { verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class)); } - private NMContext createContext(YarnConfiguration conf, + private ContainerManagerImpl createContainerManager(Context context, + DeletionService delSrvc) { + return new ContainerManagerImpl(context, exec, delSrvc, + mock(NodeStatusUpdater.class), metrics, dirsHandler) { + @Override + public void + setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } + @Override + protected void authorizeGetAndStopContainerRequest( + ContainerId containerId, Container container, + boolean stopRequest, NMTokenIdentifier identifier) + throws YarnException { + if(container == null || container.getUser().equals("Fail")){ + throw new YarnException("Reject this container"); + } + } + }; + } + + private NMContext createContext(Configuration conf, NMStateStoreService stateStore) { NMContext context = new NMContext(new NMContainerTokenSecretManager( conf), new NMTokenSecretManagerInNM(), null, - new ApplicationACLsManager(conf), stateStore); - + new ApplicationACLsManager(conf), stateStore){ + public int getHttpPort() { + return HTTP_PORT; + } + }; // simulate registration with RM MasterKey masterKey = new MasterKeyPBImpl(); masterKey.setKeyId(123); @@ -349,6 +512,58 @@ public StartContainersResponse run() throws Exception { }); } + private IncreaseContainersResourceResponse increaseContainersResource( + Context context, final ContainerManagerImpl cm, ContainerId cid, + Resource capability) throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + // construct container resource increase request + final List increaseTokens = new ArrayList(); + // add increase request + Token containerToken = TestContainerManager.createContainerToken( + cid, 0, context.getNodeId(), user.getShortUserName(), + capability, context.getContainerTokenSecretManager(), null); + increaseTokens.add(containerToken); + final IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public IncreaseContainersResourceResponse run() throws Exception { + return cm.increaseContainersResource(increaseRequest); + } + }); + } + + private ContainerStatus getContainerStatus( + Context context, final ContainerManagerImpl cm, ContainerId cid) + throws Exception { + UserGroupInformation user = UserGroupInformation.createRemoteUser( + cid.getApplicationAttemptId().toString()); + NMTokenIdentifier nmToken = new NMTokenIdentifier( + cid.getApplicationAttemptId(), context.getNodeId(), + user.getShortUserName(), + context.getNMTokenSecretManager().getCurrentKey().getKeyId()); + user.addTokenIdentifier(nmToken); + List containerIds = new ArrayList<>(); + containerIds.add(cid); + final GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + return user.doAs( + new PrivilegedExceptionAction() { + @Override + public ContainerStatus run() throws Exception { + return cm.getContainerStatuses(gcsRequest) + .getContainerStatuses().get(0); + } + }); + } + private void waitForAppState(Application app, ApplicationState state) throws Exception { final int msecPerSleep = 10; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java index e0487e7f033..a1c95ab03b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/NMMemoryStateStoreService.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; @@ -122,9 +123,10 @@ public synchronized List loadContainersState() rcsCopy.killed = rcs.killed; rcsCopy.diagnostics = rcs.diagnostics; rcsCopy.startRequest = rcs.startRequest; + rcsCopy.capability = rcs.capability; result.add(rcsCopy); } - return new ArrayList(); + return result; } @Override @@ -152,6 +154,13 @@ public synchronized void storeContainerLaunched(ContainerId containerId) rcs.status = RecoveredContainerStatus.LAUNCHED; } + @Override + public synchronized void storeContainerResourceChanged( + ContainerId containerId, Resource capability) throws IOException { + RecoveredContainerState rcs = getRecoveredContainerState(containerId); + rcs.capability = capability; + } + @Override public synchronized void storeContainerKilled(ContainerId containerId) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java index 180442499c3..08b49e75383 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/recovery/TestNMLeveldbStateStoreService.java @@ -298,6 +298,17 @@ public void testContainerStorage() throws IOException { assertEquals(containerReq, rcs.getStartRequest()); assertEquals(diags.toString(), rcs.getDiagnostics()); + // increase the container size, and verify recovered + stateStore.storeContainerResourceChanged(containerId, Resource.newInstance(2468, 4)); + restartStateStore(); + recoveredContainers = stateStore.loadContainersState(); + assertEquals(1, recoveredContainers.size()); + rcs = recoveredContainers.get(0); + assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus()); + assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode()); + assertEquals(false, rcs.getKilled()); + assertEquals(Resource.newInstance(2468, 4), rcs.getCapability()); + // mark the container killed, add some more diags, and verify recovered diags.append("some more diags for container"); stateStore.storeContainerDiagnostics(containerId, diags);