YARN-3868. Recovery support for container resizing. Contributed by Meng Ding
This commit is contained in:
parent
c3dc1af072
commit
c57eac5dfe
|
@ -218,6 +218,8 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
|
||||
support container resizing. (Meng Ding via jianhe)
|
||||
|
||||
YARN-3868. Recovery support for container resizing. (Meng Ding via jianhe)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-644. Basic null check is not performed on passed in arguments before
|
||||
|
|
|
@ -346,7 +346,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||
context.getNMStateStore(), req.getContainerLaunchContext(),
|
||||
credentials, metrics, token, rcs.getStatus(), rcs.getExitCode(),
|
||||
rcs.getDiagnostics(), rcs.getKilled());
|
||||
rcs.getDiagnostics(), rcs.getKilled(), rcs.getCapability());
|
||||
context.getContainers().put(containerId, container);
|
||||
dispatcher.getEventHandler().handle(
|
||||
new ApplicationContainerInitEvent(container));
|
||||
|
@ -1101,6 +1101,9 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
this.readLock.lock();
|
||||
try {
|
||||
if (!serviceStopped) {
|
||||
// Persist container resource change for recovery
|
||||
this.context.getNMStateStore().storeContainerResourceChanged(
|
||||
containerId, targetResource);
|
||||
getContainersMonitor().handle(
|
||||
new ChangeMonitoringContainerResourceEvent(
|
||||
containerId, targetResource));
|
||||
|
|
|
@ -154,13 +154,19 @@ public class ContainerImpl implements Container {
|
|||
Credentials creds, NodeManagerMetrics metrics,
|
||||
ContainerTokenIdentifier containerTokenIdentifier,
|
||||
RecoveredContainerStatus recoveredStatus, int exitCode,
|
||||
String diagnostics, boolean wasKilled) {
|
||||
String diagnostics, boolean wasKilled, Resource recoveredCapability) {
|
||||
this(conf, dispatcher, stateStore, launchContext, creds, metrics,
|
||||
containerTokenIdentifier);
|
||||
this.recoveredStatus = recoveredStatus;
|
||||
this.exitCode = exitCode;
|
||||
this.recoveredAsKilled = wasKilled;
|
||||
this.diagnostics.append(diagnostics);
|
||||
if (recoveredCapability != null
|
||||
&& !this.resource.equals(recoveredCapability)) {
|
||||
// resource capability had been updated before NM was down
|
||||
this.resource = Resource.newInstance(recoveredCapability.getMemory(),
|
||||
recoveredCapability.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
private static final ContainerDiagnosticsUpdateTransition UPDATE_DIAGNOSTICS_TRANSITION =
|
||||
|
|
|
@ -40,7 +40,10 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||
|
@ -99,6 +102,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
|
||||
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
|
||||
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
|
||||
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
|
||||
"/resourceChanged";
|
||||
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
|
||||
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
|
||||
|
||||
|
@ -230,6 +235,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
} else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
|
||||
rcs.status = RecoveredContainerStatus.COMPLETED;
|
||||
rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
|
||||
} else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
|
||||
rcs.capability = new ResourcePBImpl(
|
||||
ResourceProto.parseFrom(entry.getValue()));
|
||||
} else {
|
||||
throw new IOException("Unexpected container state key: " + key);
|
||||
}
|
||||
|
@ -274,6 +282,20 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerResourceChanged(ContainerId containerId,
|
||||
Resource capability) throws IOException {
|
||||
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
||||
+ CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX;
|
||||
try {
|
||||
// New value will overwrite old values for the same key
|
||||
db.put(bytes(key),
|
||||
((ResourcePBImpl) capability).getProto().toByteArray());
|
||||
} catch (DBException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerKilled(ContainerId containerId)
|
||||
throws IOException {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
|
@ -87,6 +88,11 @@ public class NMNullStateStoreService extends NMStateStoreService {
|
|||
throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerResourceChanged(ContainerId containerId,
|
||||
Resource capability) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void storeContainerKilled(ContainerId containerId)
|
||||
throws IOException {
|
||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
|
@ -74,6 +75,7 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
boolean killed = false;
|
||||
String diagnostics = "";
|
||||
StartContainerRequest startRequest;
|
||||
Resource capability;
|
||||
|
||||
public RecoveredContainerStatus getStatus() {
|
||||
return status;
|
||||
|
@ -94,6 +96,10 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
public StartContainerRequest getStartRequest() {
|
||||
return startRequest;
|
||||
}
|
||||
|
||||
public Resource getCapability() {
|
||||
return capability;
|
||||
}
|
||||
}
|
||||
|
||||
public static class LocalResourceTrackerState {
|
||||
|
@ -283,6 +289,15 @@ public abstract class NMStateStoreService extends AbstractService {
|
|||
public abstract void storeContainerLaunched(ContainerId containerId)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Record that a container resource has been changed
|
||||
* @param containerId the container ID
|
||||
* @param capability the container resource capability
|
||||
* @throws IOException
|
||||
*/
|
||||
public abstract void storeContainerResourceChanged(ContainerId containerId,
|
||||
Resource capability) throws IOException;
|
||||
|
||||
/**
|
||||
* Record that a container has completed
|
||||
* @param containerId the container ID
|
||||
|
|
|
@ -28,18 +28,30 @@ import static org.mockito.Mockito.never;
|
|||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
||||
|
@ -48,9 +60,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResource;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
||||
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
||||
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.URL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
|
@ -58,6 +78,9 @@ import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -65,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
|
@ -77,18 +101,50 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerManagerRecovery {
|
||||
public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
||||
|
||||
private NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||
public TestContainerManagerRecovery() throws UnsupportedFileSystemException {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
localFS.delete(new Path(localDir.getAbsolutePath()), true);
|
||||
localFS.delete(new Path(tmpDir.getAbsolutePath()), true);
|
||||
localFS.delete(new Path(localLogDir.getAbsolutePath()), true);
|
||||
localFS.delete(new Path(remoteLogDir.getAbsolutePath()), true);
|
||||
localDir.mkdir();
|
||||
tmpDir.mkdir();
|
||||
localLogDir.mkdir();
|
||||
remoteLogDir.mkdir();
|
||||
LOG.info("Created localDir in " + localDir.getAbsolutePath());
|
||||
LOG.info("Created tmpDir in " + tmpDir.getAbsolutePath());
|
||||
|
||||
String bindAddress = "0.0.0.0:12345";
|
||||
conf.set(YarnConfiguration.NM_ADDRESS, bindAddress);
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogDir.getAbsolutePath());
|
||||
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
|
||||
// Default delSrvc
|
||||
delSrvc = createDeletionService();
|
||||
delSrvc.init(conf);
|
||||
exec = createContainerExecutor();
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
nodeHealthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
nodeHealthChecker.init(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplicationRecovery() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
||||
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
|
||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "yarn_admin_user");
|
||||
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||
|
@ -233,6 +289,91 @@ public class TestContainerManagerRecovery {
|
|||
cm.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerResizeRecovery() throws Exception {
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
||||
NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
||||
stateStore.init(conf);
|
||||
stateStore.start();
|
||||
Context context = createContext(conf, stateStore);
|
||||
ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
||||
cm.init(conf);
|
||||
cm.start();
|
||||
// add an application by starting a container
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
ApplicationAttemptId attemptId =
|
||||
ApplicationAttemptId.newInstance(appId, 1);
|
||||
ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
||||
Map<String, String> containerEnv = Collections.emptyMap();
|
||||
Map<String, ByteBuffer> serviceData = Collections.emptyMap();
|
||||
Credentials containerCreds = new Credentials();
|
||||
DataOutputBuffer dob = new DataOutputBuffer();
|
||||
containerCreds.writeTokenStorageToStream(dob);
|
||||
ByteBuffer containerTokens = ByteBuffer.wrap(dob.getData(), 0,
|
||||
dob.getLength());
|
||||
Map<ApplicationAccessType, String> acls = Collections.emptyMap();
|
||||
File tmpDir = new File("target",
|
||||
this.getClass().getSimpleName() + "-tmpDir");
|
||||
File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
|
||||
PrintWriter fileWriter = new PrintWriter(scriptFile);
|
||||
if (Shell.WINDOWS) {
|
||||
fileWriter.println("@ping -n 100 127.0.0.1 >nul");
|
||||
} else {
|
||||
fileWriter.write("\numask 0");
|
||||
fileWriter.write("\nexec sleep 100");
|
||||
}
|
||||
fileWriter.close();
|
||||
FileContext localFS = FileContext.getLocalFSFileContext();
|
||||
URL resource_alpha =
|
||||
ConverterUtils.getYarnUrlFromPath(localFS
|
||||
.makeQualified(new Path(scriptFile.getAbsolutePath())));
|
||||
LocalResource rsrc_alpha = RecordFactoryProvider
|
||||
.getRecordFactory(null).newRecordInstance(LocalResource.class);
|
||||
rsrc_alpha.setResource(resource_alpha);
|
||||
rsrc_alpha.setSize(-1);
|
||||
rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
|
||||
rsrc_alpha.setType(LocalResourceType.FILE);
|
||||
rsrc_alpha.setTimestamp(scriptFile.lastModified());
|
||||
String destinationFile = "dest_file";
|
||||
Map<String, LocalResource> localResources = new HashMap<>();
|
||||
localResources.put(destinationFile, rsrc_alpha);
|
||||
List<String> commands =
|
||||
Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
||||
ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
|
||||
localResources, containerEnv, commands, serviceData,
|
||||
containerTokens, acls);
|
||||
StartContainersResponse startResponse = startContainer(
|
||||
context, cm, cid, clc, null);
|
||||
assertTrue(startResponse.getFailedRequests().isEmpty());
|
||||
assertEquals(1, context.getApplications().size());
|
||||
Application app = context.getApplications().get(appId);
|
||||
assertNotNull(app);
|
||||
// make sure the container reaches RUNNING state
|
||||
waitForNMContainerState(cm, cid,
|
||||
org.apache.hadoop.yarn.server.nodemanager
|
||||
.containermanager.container.ContainerState.RUNNING);
|
||||
Resource targetResource = Resource.newInstance(2048, 2);
|
||||
IncreaseContainersResourceResponse increaseResponse =
|
||||
increaseContainersResource(context, cm, cid, targetResource);
|
||||
assertTrue(increaseResponse.getFailedRequests().isEmpty());
|
||||
// check status
|
||||
ContainerStatus containerStatus = getContainerStatus(context, cm, cid);
|
||||
assertEquals(targetResource, containerStatus.getCapability());
|
||||
// restart and verify container is running and recovered
|
||||
// to the correct size
|
||||
cm.stop();
|
||||
context = createContext(conf, stateStore);
|
||||
cm = createContainerManager(context);
|
||||
cm.init(conf);
|
||||
cm.start();
|
||||
assertEquals(1, context.getApplications().size());
|
||||
app = context.getApplications().get(appId);
|
||||
assertNotNull(app);
|
||||
containerStatus = getContainerStatus(context, cm, cid);
|
||||
assertEquals(targetResource, containerStatus.getCapability());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerCleanupOnShutdown() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
|
@ -257,10 +398,8 @@ public class TestContainerManagerRecovery {
|
|||
LogAggregationContext.newInstance("includePattern", "excludePattern");
|
||||
|
||||
// verify containers are stopped on shutdown without recovery
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, false);
|
||||
conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, false);
|
||||
conf.set(YarnConfiguration.NM_ADDRESS, "localhost:1234");
|
||||
Context context = createContext(conf, new NMNullStateStoreService());
|
||||
ContainerManagerImpl cm = spy(createContainerManager(context));
|
||||
cm.init(conf);
|
||||
|
@ -306,12 +445,36 @@ public class TestContainerManagerRecovery {
|
|||
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
|
||||
}
|
||||
|
||||
private NMContext createContext(YarnConfiguration conf,
|
||||
private ContainerManagerImpl createContainerManager(Context context,
|
||||
DeletionService delSrvc) {
|
||||
return new ContainerManagerImpl(context, exec, delSrvc,
|
||||
mock(NodeStatusUpdater.class), metrics, dirsHandler) {
|
||||
@Override
|
||||
public void
|
||||
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||
// do nothing
|
||||
}
|
||||
@Override
|
||||
protected void authorizeGetAndStopContainerRequest(
|
||||
ContainerId containerId, Container container,
|
||||
boolean stopRequest, NMTokenIdentifier identifier)
|
||||
throws YarnException {
|
||||
if(container == null || container.getUser().equals("Fail")){
|
||||
throw new YarnException("Reject this container");
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private NMContext createContext(Configuration conf,
|
||||
NMStateStoreService stateStore) {
|
||||
NMContext context = new NMContext(new NMContainerTokenSecretManager(
|
||||
conf), new NMTokenSecretManagerInNM(), null,
|
||||
new ApplicationACLsManager(conf), stateStore);
|
||||
|
||||
new ApplicationACLsManager(conf), stateStore){
|
||||
public int getHttpPort() {
|
||||
return HTTP_PORT;
|
||||
}
|
||||
};
|
||||
// simulate registration with RM
|
||||
MasterKey masterKey = new MasterKeyPBImpl();
|
||||
masterKey.setKeyId(123);
|
||||
|
@ -349,6 +512,58 @@ public class TestContainerManagerRecovery {
|
|||
});
|
||||
}
|
||||
|
||||
private IncreaseContainersResourceResponse increaseContainersResource(
|
||||
Context context, final ContainerManagerImpl cm, ContainerId cid,
|
||||
Resource capability) throws Exception {
|
||||
UserGroupInformation user = UserGroupInformation.createRemoteUser(
|
||||
cid.getApplicationAttemptId().toString());
|
||||
// construct container resource increase request
|
||||
final List<Token> increaseTokens = new ArrayList<Token>();
|
||||
// add increase request
|
||||
Token containerToken = TestContainerManager.createContainerToken(
|
||||
cid, 0, context.getNodeId(), user.getShortUserName(),
|
||||
capability, context.getContainerTokenSecretManager(), null);
|
||||
increaseTokens.add(containerToken);
|
||||
final IncreaseContainersResourceRequest increaseRequest =
|
||||
IncreaseContainersResourceRequest.newInstance(increaseTokens);
|
||||
NMTokenIdentifier nmToken = new NMTokenIdentifier(
|
||||
cid.getApplicationAttemptId(), context.getNodeId(),
|
||||
user.getShortUserName(),
|
||||
context.getNMTokenSecretManager().getCurrentKey().getKeyId());
|
||||
user.addTokenIdentifier(nmToken);
|
||||
return user.doAs(
|
||||
new PrivilegedExceptionAction<IncreaseContainersResourceResponse>() {
|
||||
@Override
|
||||
public IncreaseContainersResourceResponse run() throws Exception {
|
||||
return cm.increaseContainersResource(increaseRequest);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private ContainerStatus getContainerStatus(
|
||||
Context context, final ContainerManagerImpl cm, ContainerId cid)
|
||||
throws Exception {
|
||||
UserGroupInformation user = UserGroupInformation.createRemoteUser(
|
||||
cid.getApplicationAttemptId().toString());
|
||||
NMTokenIdentifier nmToken = new NMTokenIdentifier(
|
||||
cid.getApplicationAttemptId(), context.getNodeId(),
|
||||
user.getShortUserName(),
|
||||
context.getNMTokenSecretManager().getCurrentKey().getKeyId());
|
||||
user.addTokenIdentifier(nmToken);
|
||||
List<ContainerId> containerIds = new ArrayList<>();
|
||||
containerIds.add(cid);
|
||||
final GetContainerStatusesRequest gcsRequest =
|
||||
GetContainerStatusesRequest.newInstance(containerIds);
|
||||
return user.doAs(
|
||||
new PrivilegedExceptionAction<ContainerStatus>() {
|
||||
@Override
|
||||
public ContainerStatus run() throws Exception {
|
||||
return cm.getContainerStatuses(gcsRequest)
|
||||
.getContainerStatuses().get(0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForAppState(Application app, ApplicationState state)
|
||||
throws Exception {
|
||||
final int msecPerSleep = 10;
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||
|
@ -122,9 +123,10 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
rcsCopy.killed = rcs.killed;
|
||||
rcsCopy.diagnostics = rcs.diagnostics;
|
||||
rcsCopy.startRequest = rcs.startRequest;
|
||||
rcsCopy.capability = rcs.capability;
|
||||
result.add(rcsCopy);
|
||||
}
|
||||
return new ArrayList<RecoveredContainerState>();
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -152,6 +154,13 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
|
|||
rcs.status = RecoveredContainerStatus.LAUNCHED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeContainerResourceChanged(
|
||||
ContainerId containerId, Resource capability) throws IOException {
|
||||
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
|
||||
rcs.capability = capability;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void storeContainerKilled(ContainerId containerId)
|
||||
throws IOException {
|
||||
|
|
|
@ -298,6 +298,17 @@ public class TestNMLeveldbStateStoreService {
|
|||
assertEquals(containerReq, rcs.getStartRequest());
|
||||
assertEquals(diags.toString(), rcs.getDiagnostics());
|
||||
|
||||
// increase the container size, and verify recovered
|
||||
stateStore.storeContainerResourceChanged(containerId, Resource.newInstance(2468, 4));
|
||||
restartStateStore();
|
||||
recoveredContainers = stateStore.loadContainersState();
|
||||
assertEquals(1, recoveredContainers.size());
|
||||
rcs = recoveredContainers.get(0);
|
||||
assertEquals(RecoveredContainerStatus.LAUNCHED, rcs.getStatus());
|
||||
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
|
||||
assertEquals(false, rcs.getKilled());
|
||||
assertEquals(Resource.newInstance(2468, 4), rcs.getCapability());
|
||||
|
||||
// mark the container killed, add some more diags, and verify recovered
|
||||
diags.append("some more diags for container");
|
||||
stateStore.storeContainerDiagnostics(containerId, diags);
|
||||
|
|
Loading…
Reference in New Issue