YARN-2249. Avoided AM release requests being lost on work preserving RM restart. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1618972 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-08-19 20:33:49 +00:00
parent bf9aa34dea
commit f6a778c372
11 changed files with 213 additions and 74 deletions

View File

@ -36,8 +36,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@ -933,4 +934,10 @@ final public class ResourceSchedulerWrapper
return new HashMap<ApplicationId,
SchedulerApplication<SchedulerApplicationAttempt>>();
}
@Override
protected void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
// do nothing
}
}

View File

@ -214,6 +214,9 @@ Release 2.6.0 - UNRELEASED
YARN-2409. RM ActiveToStandBy transition missing stoping previous rmDispatcher.
(Rohith via jianhe)
YARN-2249. Avoided AM release requests being lost on work preserving RM
restart. (Jian He via zjshen)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -23,10 +23,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -34,18 +38,25 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -54,6 +65,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.util.concurrent.SettableFuture;
@SuppressWarnings("unchecked")
public abstract class AbstractYarnScheduler
<T extends SchedulerApplicationAttempt, N extends SchedulerNode>
@ -72,6 +84,7 @@ public abstract class AbstractYarnScheduler
protected RMContext rmContext;
protected Map<ApplicationId, SchedulerApplication<T>> applications;
protected int nmExpireInterval;
protected final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@ -87,6 +100,15 @@ public abstract class AbstractYarnScheduler
super(name);
}
@Override
public void serviceInit(Configuration conf) throws Exception {
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
createReleaseCache();
super.serviceInit(conf);
}
public synchronized List<Container> getTransferredContainers(
ApplicationAttemptId currentAttempt) {
ApplicationId appId = currentAttempt.getApplicationId();
@ -281,6 +303,19 @@ public abstract class AbstractYarnScheduler
((RMContainerImpl)rmContainer).setAMContainer(true);
}
}
synchronized (schedulerAttempt) {
Set<ContainerId> releases = schedulerAttempt.getPendingRelease();
if (releases.contains(container.getContainerId())) {
// release the container
rmContainer.handle(new RMContainerFinishedEvent(container
.getContainerId(), SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED));
releases.remove(container.getContainerId());
LOG.info(container.getContainerId() + " is released by application.");
}
}
}
}
@ -320,6 +355,62 @@ public abstract class AbstractYarnScheduler
}
}
protected void createReleaseCache() {
// Cleanup the cache after nm expire interval.
new Timer().schedule(new TimerTask() {
@Override
public void run() {
for (SchedulerApplication<T> app : applications.values()) {
T attempt = app.getCurrentAppAttempt();
synchronized (attempt) {
for (ContainerId containerId : attempt.getPendingRelease()) {
RMAuditLogger.logFailure(
app.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container",
"Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId);
}
attempt.getPendingRelease().clear();
}
}
LOG.info("Release request cache is cleaned up");
}
}, nmExpireInterval);
}
// clean up a completed container
protected abstract void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event);
protected void releaseContainers(List<ContainerId> containers,
SchedulerApplicationAttempt attempt) {
for (ContainerId containerId : containers) {
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
< nmExpireInterval) {
LOG.info(containerId + " doesn't exist. Add the container"
+ " to the release request cache as it maybe on recovery.");
synchronized (attempt) {
attempt.getPendingRelease().add(containerId);
}
} else {
RMAuditLogger.logFailure(attempt.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "Scheduler",
"Trying to release container not owned by app or with invalid id.",
attempt.getApplicationId(), containerId);
}
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(containerId,
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
}

View File

@ -17,13 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -41,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
@ -87,6 +88,13 @@ public class SchedulerApplicationAttempt {
protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
// This pendingRelease is used in work-preserving recovery scenario to keep
// track of the AM's outstanding release requests. RM on recovery could
// receive the release request form AM before it receives the container status
// from NM for recovery. In this case, the to-be-recovered containers reported
// by NM should not be recovered.
private Set<ContainerId> pendingRelease = null;
/**
* Count how many times the application has been given an opportunity
* to schedule a task at each priority. Each time the scheduler
@ -114,7 +122,7 @@ public class SchedulerApplicationAttempt {
new AppSchedulingInfo(applicationAttemptId, user, queue,
activeUsersManager, rmContext.getEpoch());
this.queue = queue;
this.pendingRelease = new HashSet<ContainerId>();
if (rmContext.getRMApps() != null &&
rmContext.getRMApps()
.containsKey(applicationAttemptId.getApplicationId())) {
@ -163,6 +171,10 @@ public class SchedulerApplicationAttempt {
return appSchedulingInfo.getResourceRequests(priority);
}
public Set<ContainerId> getPendingRelease() {
return this.pendingRelease;
}
public int getNewContainerId() {
return appSchedulingInfo.getNewContainerId();
}

View File

@ -54,8 +54,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
@ -199,7 +197,7 @@ public class CapacityScheduler extends
private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5;
private boolean overrideWithQueueMappings = false;
private List<QueueMapping> mappings = new ArrayList<QueueMapping>();
private List<QueueMapping> mappings = null;
private Groups groups;
@VisibleForTesting
@ -789,21 +787,7 @@ public class CapacityScheduler extends
getMinimumResourceCapability(), maximumAllocation);
// Release containers
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "CapacityScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainerId,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
releaseContainers(release, application);
synchronized (application) {
@ -1098,7 +1082,8 @@ public class CapacityScheduler extends
}
@Lock(CapacityScheduler.class)
private synchronized void completedContainer(RMContainer rmContainer,
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");

View File

@ -49,8 +49,6 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
@ -810,7 +808,8 @@ public class FairScheduler extends
/**
* Clean up a completed container.
*/
private synchronized void completedContainer(RMContainer rmContainer,
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@ -913,21 +912,7 @@ public class FairScheduler extends
}
// Release containers
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FairScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainerId,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
releaseContainers(release, application);
synchronized (application) {
if (!ask.isEmpty()) {

View File

@ -52,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -89,7 +87,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -295,21 +292,7 @@ public class FifoScheduler extends
clusterResource, minimumAllocation, maximumAllocation);
// Release containers
for (ContainerId releasedContainer : release) {
RMContainer rmContainer = getRMContainer(releasedContainer);
if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FifoScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainer);
}
containerCompleted(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainer,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
releaseContainers(release, application);
synchronized (application) {
@ -443,7 +426,7 @@ public class FifoScheduler extends
LOG.info("Skip killing " + container.getContainerId());
continue;
}
containerCompleted(container,
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
@ -717,7 +700,7 @@ public class FifoScheduler extends
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
containerCompleted(getRMContainer(containerId),
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
@ -818,7 +801,7 @@ public class FifoScheduler extends
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId();
containerCompleted(getRMContainer(containerid),
completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
@ -831,7 +814,8 @@ public class FifoScheduler extends
}
@Lock(FifoScheduler.class)
private synchronized void containerCompleted(RMContainer rmContainer,
@Override
protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@ -881,7 +865,7 @@ public class FifoScheduler extends
}
// Kill running containers
for(RMContainer container : node.getRunningContainers()) {
containerCompleted(container,
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),

View File

@ -49,7 +49,7 @@ public class MockAM {
private volatile int responseId = 0;
private final ApplicationAttemptId attemptId;
private final RMContext context;
private RMContext context;
private ApplicationMasterProtocol amRMProtocol;
private final List<ResourceRequest> requests = new ArrayList<ResourceRequest>();
@ -61,8 +61,10 @@ public class MockAM {
this.amRMProtocol = amRMProtocol;
this.attemptId = attemptId;
}
void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol) {
public void setAMRMProtocol(ApplicationMasterProtocol amRMProtocol,
RMContext context) {
this.context = context;
this.amRMProtocol = amRMProtocol;
}

View File

@ -171,7 +171,6 @@ public class TestApplicationMasterService {
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
am1.setAMRMProtocol(rm.getApplicationMasterService());
AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
List<ContainerId> release = new ArrayList<ContainerId>();

View File

@ -289,7 +289,7 @@ public class TestRMRestart {
// verify old AM is not accepted
// change running AM to talk to new RM
am1.setAMRMProtocol(rm2.getApplicationMasterService());
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
AllocateResponse allocResponse = am1.allocate(
new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
@ -1663,7 +1663,7 @@ public class TestRMRestart {
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
RMApp loadedApp1 = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
am1.setAMRMProtocol(rm2.getApplicationMasterService());
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.allocate(new ArrayList<ResourceRequest>(), new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());

View File

@ -33,10 +33,13 @@ import java.util.Set;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
@ -72,6 +75,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import com.google.common.base.Supplier;
@SuppressWarnings({"rawtypes", "unchecked"})
@RunWith(value = Parameterized.class)
public class TestWorkPreservingRMRestart {
@ -572,8 +578,8 @@ public class TestWorkPreservingRMRestart {
rm2.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.LAUNCHED);
am0.setAMRMProtocol(rm2.getApplicationMasterService());
am0.registerAppAttempt(false);
am0.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am0.registerAppAttempt(true);
rm2.waitForState(app0.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
@ -646,6 +652,69 @@ public class TestWorkPreservingRMRestart {
waitForNumContainersToRecover(2, rm2, am0.getApplicationAttemptId());
}
// Test if RM on recovery receives the container release request from AM
// before it receives the container status reported by NM for recovery. this
// container should not be recovered.
@Test (timeout = 30000)
public void testReleasedContainerNotRecovered() throws Exception {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
rm1.start();
RMApp app1 = rm1.submitApp(1024);
final MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// Re-start RM
conf.setInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 8000);
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
am1.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
am1.registerAppAttempt(true);
// try to release a container before the container is actually recovered.
final ContainerId runningContainer =
ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
am1.allocate(null, Arrays.asList(runningContainer));
// send container statuses to recover the containers
List<NMContainerStatus> containerStatuses =
createNMContainerStatusForApp(am1);
nm1.registerNode(containerStatuses, null);
// only the am container should be recovered.
waitForNumContainersToRecover(1, rm2, am1.getApplicationAttemptId());
final AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm2.getResourceScheduler();
// cached release request is cleaned.
// assertFalse(scheduler.getPendingRelease().contains(runningContainer));
AllocateResponse response = am1.allocate(null, null);
// AM gets notified of the completed container.
boolean receivedCompletedContainer = false;
for (ContainerStatus status : response.getCompletedContainersStatuses()) {
if (status.getContainerId().equals(runningContainer)) {
receivedCompletedContainer = true;
}
}
assertTrue(receivedCompletedContainer);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
// release cache is cleaned up and previous running container is not
// recovered
return scheduler.getApplicationAttempt(am1.getApplicationAttemptId())
.getPendingRelease().isEmpty()
&& scheduler.getRMContainer(runningContainer) == null;
}
}, 1000, 20000);
}
private void asserteMetrics(QueueMetrics qm, int appsSubmitted,
int appsPending, int appsRunning, int appsCompleted,
int allocatedContainers, int availableMB, int availableVirtualCores,
@ -661,7 +730,7 @@ public class TestWorkPreservingRMRestart {
assertEquals(allocatedVirtualCores, qm.getAllocatedVirtualCores());
}
private void waitForNumContainersToRecover(int num, MockRM rm,
public static void waitForNumContainersToRecover(int num, MockRM rm,
ApplicationAttemptId attemptId) throws Exception {
AbstractYarnScheduler scheduler =
(AbstractYarnScheduler) rm.getResourceScheduler();
@ -674,7 +743,9 @@ public class TestWorkPreservingRMRestart {
attempt = scheduler.getApplicationAttempt(attemptId);
}
while (attempt.getLiveContainers().size() < num) {
System.out.println("Wait for " + num + " containers to recover.");
System.out.println("Wait for " + num
+ " containers to recover. currently: "
+ attempt.getLiveContainers().size());
Thread.sleep(200);
}
}