Revert "YARN-5049. Extend NMStateStore to save queued container information. (Konstantinos Karanasos via asuresh)"

This reverts commit 307cda70db.
This commit is contained in:
Arun Suresh 2016-08-24 10:23:06 -07:00
parent d19e29bd68
commit bd348d20b2
7 changed files with 11 additions and 110 deletions

View File

@ -337,6 +337,7 @@ public class ContainerManagerImpl extends CompositeService implements
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext)); app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
} }
@SuppressWarnings("unchecked")
private void recoverContainer(RecoveredContainerState rcs) private void recoverContainer(RecoveredContainerState rcs)
throws IOException { throws IOException {
StartContainerRequest req = rcs.getStartRequest(); StartContainerRequest req = rcs.getStartRequest();
@ -351,7 +352,14 @@ public class ContainerManagerImpl extends CompositeService implements
+ " with exit code " + rcs.getExitCode()); + " with exit code " + rcs.getExitCode());
if (context.getApplications().containsKey(appId)) { if (context.getApplications().containsKey(appId)) {
recoverActiveContainer(launchContext, token, rcs); Credentials credentials =
YarnServerSecurityUtils.parseCredentials(launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher,
req.getContainerLaunchContext(),
credentials, metrics, token, context, rcs);
context.getContainers().put(containerId, container);
dispatcher.getEventHandler().handle(
new ApplicationContainerInitEvent(container));
} else { } else {
if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) { if (rcs.getStatus() != RecoveredContainerStatus.COMPLETED) {
LOG.warn(containerId + " has no corresponding application!"); LOG.warn(containerId + " has no corresponding application!");
@ -361,22 +369,6 @@ public class ContainerManagerImpl extends CompositeService implements
} }
} }
/**
* Recover a running container.
*/
@SuppressWarnings("unchecked")
protected void recoverActiveContainer(
ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
RecoveredContainerState rcs) throws IOException {
Credentials credentials = YarnServerSecurityUtils.parseCredentials(
launchContext);
Container container = new ContainerImpl(getConfig(), dispatcher,
launchContext, credentials, metrics, token, context, rcs);
context.getContainers().put(token.getContainerID(), container);
dispatcher.getEventHandler().handle(new ApplicationContainerInitEvent(
container));
}
private void waitForRecoveredContainers() throws InterruptedException { private void waitForRecoveredContainers() throws InterruptedException {
final int sleepMsec = 100; final int sleepMsec = 100;
int waitIterations = 100; int waitIterations = 100;

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
@ -58,8 +57,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ProcessTreeInfo;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -128,8 +125,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
startAllocatedContainer(allocatedContInfo); startAllocatedContainer(allocatedContInfo);
} else { } else {
ContainerId cIdToStart = containerTokenIdentifier.getContainerID(); ContainerId cIdToStart = containerTokenIdentifier.getContainerID();
this.context.getNMStateStore().storeContainer(cIdToStart, request);
this.context.getNMStateStore().storeContainerQueued(cIdToStart);
LOG.info("No available resources for container {} to start its execution " LOG.info("No available resources for container {} to start its execution "
+ "immediately.", cIdToStart); + "immediately.", cIdToStart);
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) { if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
@ -164,7 +159,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
this.context.getQueuingContext().getKilledQueuedContainers().put( this.context.getQueuingContext().getKilledQueuedContainers().put(
containerTokenId, containerTokenId,
"Queued container request removed by ApplicationMaster."); "Queued container request removed by ApplicationMaster.");
this.context.getNMStateStore().storeContainerKilled(containerID);
} else { } else {
// The container started execution in the meanwhile. // The container started execution in the meanwhile.
try { try {
@ -474,38 +468,6 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
return super.getContainerStatusInternal(containerID, nmTokenIdentifier); return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
} }
/**
* Recover running or queued container.
*/
@Override
protected void recoverActiveContainer(
ContainerLaunchContext launchContext, ContainerTokenIdentifier token,
RecoveredContainerState rcs) throws IOException {
if (rcs.getStatus() ==
RecoveredContainerStatus.QUEUED && !rcs.getKilled()) {
LOG.info(token.getContainerID()
+ "will be added to the queued containers.");
AllocatedContainerInfo allocatedContInfo = new AllocatedContainerInfo(
token, rcs.getStartRequest(), token.getExecutionType(),
token.getResource(), getConfig());
this.context.getQueuingContext().getQueuedContainers().put(
token.getContainerID(), token);
if (allocatedContInfo.getExecutionType() == ExecutionType.GUARANTEED) {
queuedGuaranteedContainers.add(allocatedContInfo);
// Kill running opportunistic containers to make space for
// guaranteed container.
killOpportunisticContainers(allocatedContInfo);
} else {
queuedOpportunisticContainers.add(allocatedContInfo);
}
} else {
super.recoverActiveContainer(launchContext, token, rcs);
}
}
@VisibleForTesting @VisibleForTesting
public int getNumAllocatedGuaranteedContainers() { public int getNumAllocatedGuaranteedContainers() {
return allocatedGuaranteedContainers.size(); return allocatedGuaranteedContainers.size();

View File

@ -80,7 +80,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
private static final Version CURRENT_VERSION_INFO = Version private static final Version CURRENT_VERSION_INFO = Version
.newInstance(2, 0); .newInstance(1, 0);
private static final String DELETION_TASK_KEY_PREFIX = private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_"; "DeletionService/deltask_";
@ -106,7 +106,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request"; private static final String CONTAINER_REQUEST_KEY_SUFFIX = "/request";
private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics"; private static final String CONTAINER_DIAGS_KEY_SUFFIX = "/diagnostics";
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched"; private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX = private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
"/resourceChanged"; "/resourceChanged";
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed"; private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
@ -240,13 +239,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
StartContainerRequestProto.parseFrom(entry.getValue())); StartContainerRequestProto.parseFrom(entry.getValue()));
} else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_DIAGS_KEY_SUFFIX)) {
rcs.diagnostics = asString(entry.getValue()); rcs.diagnostics = asString(entry.getValue());
} else if (suffix.equals(CONTAINER_QUEUED_KEY_SUFFIX)) {
if (rcs.status == RecoveredContainerStatus.REQUESTED) {
rcs.status = RecoveredContainerStatus.QUEUED;
}
} else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_LAUNCHED_KEY_SUFFIX)) {
if ((rcs.status == RecoveredContainerStatus.REQUESTED) if (rcs.status == RecoveredContainerStatus.REQUESTED) {
|| (rcs.status == RecoveredContainerStatus.QUEUED)) {
rcs.status = RecoveredContainerStatus.LAUNCHED; rcs.status = RecoveredContainerStatus.LAUNCHED;
} }
} else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) { } else if (suffix.equals(CONTAINER_KILLED_KEY_SUFFIX)) {
@ -289,21 +283,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
} }
} }
@Override
public void storeContainerQueued(ContainerId containerId) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("storeContainerQueued: containerId=" + containerId);
}
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
+ CONTAINER_QUEUED_KEY_SUFFIX;
try {
db.put(bytes(key), EMPTY_VALUE);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override @Override
public void storeContainerDiagnostics(ContainerId containerId, public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException { StringBuilder diagnostics) throws IOException {
@ -438,7 +417,6 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_REQUEST_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_DIAGS_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_LAUNCHED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_QUEUED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX)); batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
db.write(batch); db.write(batch);

View File

@ -74,10 +74,6 @@ public class NMNullStateStoreService extends NMStateStoreService {
StartContainerRequest startRequest) throws IOException { StartContainerRequest startRequest) throws IOException {
} }
@Override
public void storeContainerQueued(ContainerId containerId) throws IOException {
}
@Override @Override
public void storeContainerDiagnostics(ContainerId containerId, public void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException { StringBuilder diagnostics) throws IOException {

View File

@ -62,7 +62,6 @@ public abstract class NMStateStoreService extends AbstractService {
public enum RecoveredContainerStatus { public enum RecoveredContainerStatus {
REQUESTED, REQUESTED,
QUEUED,
LAUNCHED, LAUNCHED,
COMPLETED COMPLETED
} }
@ -312,14 +311,6 @@ public abstract class NMStateStoreService extends AbstractService {
public abstract void storeContainer(ContainerId containerId, public abstract void storeContainer(ContainerId containerId,
StartContainerRequest startRequest) throws IOException; StartContainerRequest startRequest) throws IOException;
/**
* Record that a container has been queued at the NM
* @param containerId the container ID
* @throws IOException
*/
public abstract void storeContainerQueued(ContainerId containerId)
throws IOException;
/** /**
* Record that a container has been launched * Record that a container has been launched
* @param containerId the container ID * @param containerId the container ID

View File

@ -131,12 +131,6 @@ public class NMMemoryStateStoreService extends NMStateStoreService {
containerStates.put(containerId, rcs); containerStates.put(containerId, rcs);
} }
@Override
public void storeContainerQueued(ContainerId containerId) throws IOException {
RecoveredContainerState rcs = getRecoveredContainerState(containerId);
rcs.status = RecoveredContainerStatus.QUEUED;
}
@Override @Override
public synchronized void storeContainerDiagnostics(ContainerId containerId, public synchronized void storeContainerDiagnostics(ContainerId containerId,
StringBuilder diagnostics) throws IOException { StringBuilder diagnostics) throws IOException {

View File

@ -280,18 +280,6 @@ public class TestNMLeveldbStateStoreService {
// check whether the new container record is discarded // check whether the new container record is discarded
assertEquals(1, recoveredContainers.size()); assertEquals(1, recoveredContainers.size());
// queue the container, and verify recovered
stateStore.storeContainerQueued(containerId);
restartStateStore();
recoveredContainers = stateStore.loadContainersState();
assertEquals(1, recoveredContainers.size());
rcs = recoveredContainers.get(0);
assertEquals(RecoveredContainerStatus.QUEUED, rcs.getStatus());
assertEquals(ContainerExitStatus.INVALID, rcs.getExitCode());
assertEquals(false, rcs.getKilled());
assertEquals(containerReq, rcs.getStartRequest());
assertTrue(rcs.getDiagnostics().isEmpty());
// launch the container, add some diagnostics, and verify recovered // launch the container, add some diagnostics, and verify recovered
StringBuilder diags = new StringBuilder(); StringBuilder diags = new StringBuilder();
stateStore.storeContainerLaunched(containerId); stateStore.storeContainerLaunched(containerId);