diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 9b254ae9072..98346ce861f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -372,6 +372,44 @@ public abstract class AllocateResponse { public void setUpdateErrors(List updateErrors) { } + /** + * Get the list of running containers as viewed by + * ResourceManager from previous application attempts which + * have not been reported to the Application Master yet. + *
+ * These containers were recovered by the RM after the application master + * had already registered. This may happen after RM restart when some NMs get + * delayed in connecting to the RM and reporting the active containers. + * Since they were not reported in the registration + * response, they are reported in the response to the AM heartbeat. + * + * @return the list of running containers as viewed by + * ResourceManager from previous application attempts. + */ + @Public + @Unstable + public abstract List getContainersFromPreviousAttempts(); + + /** + * Set the list of running containers as viewed by + * ResourceManager from previous application attempts which have + * not been reported to the Application Master yet. + *
+ * These containers were recovered by the RM after the application master + * had already registered. This may happen after RM restart when some NMs get + * delayed in connecting to the RM and reporting the active containers. + * Since they were not reported in the registration + * response, they are reported in the response to the AM heartbeat. + * + * @param containersFromPreviousAttempt + * the list of running containers as viewed by + * ResourceManager from previous application attempts. + */ + @Private + @Unstable + public abstract void setContainersFromPreviousAttempts( + List containersFromPreviousAttempt); + @Private @Unstable public static AllocateResponseBuilder newBuilder() { @@ -589,6 +627,22 @@ public abstract class AllocateResponse { return this; } + /** + * Set the containersFromPreviousAttempt of the response. + * @see AllocateResponse#setContainersFromPreviousAttempts(List) + * @param containersFromPreviousAttempt + * containersFromPreviousAttempt of the response + * @return {@link AllocateResponseBuilder} + */ + @Private + @Unstable + public AllocateResponseBuilder containersFromPreviousAttempt( + List containersFromPreviousAttempt) { + allocateResponse.setContainersFromPreviousAttempts( + containersFromPreviousAttempt); + return this; + } + /** * Return generated {@link AllocateResponse} object. * @return {@link AllocateResponse} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 7146b9937d6..4e97c7442d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -117,6 +117,7 @@ message AllocateResponseProto { optional CollectorInfoProto collector_info = 14; repeated UpdateContainerErrorProto update_errors = 15; repeated UpdatedContainerProto updated_containers = 16; + repeated ContainerProto containers_from_previous_attempts = 17; } enum SchedulerResourceTypes { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index ff35da81eb0..5ca1e73a5fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -74,6 +74,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { Resource limit; private List allocatedContainers = null; + private List containersFromPreviousAttempts = null; private List nmTokens = null; private List completedContainersStatuses = null; private List updatedContainers = null; @@ -172,6 +173,12 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.appPriority != null) { builder.setApplicationPriority(convertToProtoFormat(this.appPriority)); } + if (this.containersFromPreviousAttempts != null) { + builder.clearContainersFromPreviousAttempts(); + Iterable iterable = + getContainerProtoIterable(this.containersFromPreviousAttempts); + builder.addAllContainersFromPreviousAttempts(iterable); + } } private synchronized void mergeLocalToProto() { @@ -447,6 +454,23 @@ public class AllocateResponsePBImpl extends AllocateResponse { this.appPriority = priority; } + @Override + public synchronized List getContainersFromPreviousAttempts() { + initContainersFromPreviousAttemptsList(); + return this.containersFromPreviousAttempts; + } + + @Override + public synchronized void setContainersFromPreviousAttempts( + final List containers) { + if (containers == null) { + return; + } + initContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts.clear(); + containersFromPreviousAttempts.addAll(containers); + } + private synchronized void initLocalUpdatedContainerList() { if (this.updatedContainers != null) { return; @@ -491,6 +515,19 @@ public class AllocateResponsePBImpl extends AllocateResponse { } } + private synchronized void initContainersFromPreviousAttemptsList() { + if (this.containersFromPreviousAttempts != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersFromPreviousAttemptsList(); + containersFromPreviousAttempts = new ArrayList<>(); + + for (ContainerProto c : list) { + containersFromPreviousAttempts.add(convertFromProtoFormat(c)); + } + } + private synchronized void initLocalNewNMTokenList() { if (nmTokens != null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java index 4c2531ed271..713947fe5b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java @@ -326,6 +326,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor { // Set application priority response.setApplicationPriority(app .getApplicationPriority()); + + response.setContainersFromPreviousAttempts( + allocation.getPreviousAttemptContainers()); } private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 7308fd8b0ff..e818dabdef1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -245,8 +245,8 @@ public abstract class AbstractYarnScheduler if (app == null) { return containerList; } - Collection liveContainers = - app.getCurrentAppAttempt().getLiveContainers(); + Collection liveContainers = app.getCurrentAppAttempt() + .pullContainersToTransfer(); ContainerId amContainerId = null; // For UAM, amContainer would be null if (rmContext.getRMApps().get(appId).getCurrentAppAttempt() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java index 43eadab88a6..768afded5bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java @@ -38,6 +38,7 @@ public class Allocation { final List decreasedContainers; final List promotedContainers; final List demotedContainers; + private final List previousAttemptContainers; private Resource resourceLimit; @@ -52,7 +53,7 @@ public class Allocation { Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens) { this(containers, resourceLimit,strictContainers, fungibleContainers, - fungibleResources, nmTokens, null, null, null, null); + fungibleResources, nmTokens, null, null, null, null, null); } public Allocation(List containers, Resource resourceLimit, @@ -61,14 +62,15 @@ public class Allocation { List increasedContainers, List decreasedContainer) { this(containers, resourceLimit,strictContainers, fungibleContainers, fungibleResources, nmTokens, increasedContainers, decreasedContainer, - null, null); + null, null, null); } public Allocation(List containers, Resource resourceLimit, Set strictContainers, Set fungibleContainers, List fungibleResources, List nmTokens, List increasedContainers, List decreasedContainer, - List promotedContainers, List demotedContainer) { + List promotedContainers, List demotedContainer, + List previousAttemptContainers) { this.containers = containers; this.resourceLimit = resourceLimit; this.strictContainers = strictContainers; @@ -79,6 +81,7 @@ public class Allocation { this.decreasedContainers = decreasedContainer; this.promotedContainers = promotedContainers; this.demotedContainers = demotedContainer; + this.previousAttemptContainers = previousAttemptContainers; } public List getContainers() { @@ -121,6 +124,10 @@ public class Allocation { return demotedContainers; } + public List getPreviousAttemptContainers() { + return previousAttemptContainers; + } + @VisibleForTesting public void setResourceLimit(Resource resource) { this.resourceLimit = resource; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 346bd2094b5..65673c9f05c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -145,6 +145,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { protected List updateContainerErrors = new ArrayList<>(); + //Keeps track of recovered containers from previous attempt which haven't + //been reported to the AM. + private List recoveredPreviousAttemptContainers = + new ArrayList<>(); + // This pendingRelease is used in work-preserving recovery scenario to keep // track of the AM's outstanding release requests. RM on recovery could // receive the release request form AM before it receives the container status @@ -361,6 +366,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { ContainerId id, RMContainer rmContainer) { try { writeLock.lock(); + if (!getApplicationAttemptId().equals( + rmContainer.getApplicationAttemptId()) && + !liveContainers.containsKey(id)) { + LOG.info("recovered container " + id + + " from previous attempt " + rmContainer.getApplicationAttemptId()); + recoveredPreviousAttemptContainers.add(rmContainer.getContainer()); + } liveContainers.put(id, rmContainer); if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) { this.attemptOpportunisticResourceUsage.incUsed( @@ -714,6 +726,42 @@ public class SchedulerApplicationAttempt implements SchedulableEntity { } } + /** + * Called when AM registers. These containers are reported to the AM in the + * + * RegisterApplicationMasterResponse#containersFromPreviousAttempts + * . + */ + List pullContainersToTransfer() { + try { + writeLock.lock(); + recoveredPreviousAttemptContainers.clear(); + return new ArrayList<>(liveContainers.values()); + } finally { + writeLock.unlock(); + } + } + + /** + * Called when AM heartbeats. These containers were recovered by the RM after + * the AM had registered. They are reported to the AM in the + * AllocateResponse#containersFromPreviousAttempts. + */ + public List pullPreviousAttemptContainers() { + try { + writeLock.lock(); + if (recoveredPreviousAttemptContainers.isEmpty()) { + return null; + } + List returnContainerList = new ArrayList<> + (recoveredPreviousAttemptContainers); + recoveredPreviousAttemptContainers.clear(); + return returnContainerList; + } finally { + writeLock.unlock(); + } + } + // Create container token and update NMToken altogether, if either of them fails for // some reason like DNS unavailable, do not return this container and keep it // in the newlyAllocatedContainers waiting to be refetched. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index e9bee148c4b..93d51d8af31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -702,6 +702,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { ResourceRequest rr = ResourceRequest.newBuilder() .priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY) .capability(minimumAllocation).numContainers(numCont).build(); + List previousAttemptContainers = + pullPreviousAttemptContainers(); List newlyAllocatedContainers = pullNewlyAllocatedContainers(); List newlyIncreasedContainers = pullNewlyIncreasedContainers(); List newlyDecreasedContainers = pullNewlyDecreasedContainers(); @@ -713,7 +715,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return new Allocation(newlyAllocatedContainers, headroom, null, currentContPreemption, Collections.singletonList(rr), updatedNMTokens, newlyIncreasedContainers, newlyDecreasedContainers, - newlyPromotedContainers, newlyDemotedContainers); + newlyPromotedContainers, newlyDemotedContainers, + previousAttemptContainers); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 661d0a091aa..625009d6978 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -862,7 +862,8 @@ public class FairScheduler extends preemptionContainerIds, null, null, application.pullUpdatedNMTokens(), null, null, application.pullNewlyPromotedContainers(), - application.pullNewlyDemotedContainers()); + application.pullNewlyDemotedContainers(), + application.pullPreviousAttemptContainers()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index c43069bac08..3d523aa11d9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -24,6 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.Records; import org.junit.Assert; @@ -993,4 +998,148 @@ public class TestAMRestart { rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING); rm1.stop(); } + + // Test to verify that the containers of previous attempt are returned in + // the RM response to the heartbeat of AM if these containers were not + // recovered by the time AM registered. + // + // 1. App is started with 2 containers running on 2 different nodes- + // container 2 on the NM1 node and container 3 on the NM2 node. + // 2. Fail the AM of the application. + // 3. Simulate RM restart. + // 4. NM1 connects to the restarted RM immediately. It sends the RM the status + // of container 2. + // 5. 2nd attempt of the app is launched and the app master registers with RM. + // 6. Verify that app master receives container 2 in the RM response to + // register request. + // 7. NM2 connects to the RM after a delay. It sends the RM the status of + // container 3. + // 8. Verify that the app master receives container 3 in the RM response to + // its heartbeat. + @Test(timeout = 200000) + public void testContainersFromPreviousAttemptsWithRMRestart() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2); + conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + conf.setLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0); + conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName()); + + MockRM rm1 = new MockRM(conf); + MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore(); + rm1.start(); + YarnScheduler scheduler = rm1.getResourceScheduler(); + + MockNM nm1 = new MockNM("127.0.0.1:1234", 10240, + rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockNM nm2 = new MockNM("127.0.0.1:2351", 4089, + rm1.getResourceTrackerService()); + nm2.registerNode(); + + RMApp app1 = rm1.submitApp(200, "name", "user", + new HashMap<>(), false, "default", -1, + null, "MAPREDUCE", false, true); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + allocateContainers(nm1, am1, 1); + allocateContainers(nm2, am1, 1); + + // container 2 launched and running on node 1 + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + ContainerId containerId2 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2); + rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING); + + // container 3 launched and running node 2 + nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + ContainerId containerId3 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3); + rm1.waitForState(nm2, containerId3, RMContainerState.RUNNING); + + // fail the AM normally + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, + ContainerState.COMPLETE); + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); + TestSchedulerUtils.waitSchedulerApplicationAttemptStopped( + (AbstractYarnScheduler)scheduler, am1.getApplicationAttemptId()); + + // restart rm + MockRM rm2 = new MockRM(conf, memStore); + rm2.start(); + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus container2Status = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + nm1.registerNode(Lists.newArrayList(container2Status), null); + + + // Wait for RM to settle down on recovering containers; + Thread.sleep(3000); + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, + ContainerState.RUNNING); + rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING); + Assert.assertNotNull(rm2.getResourceScheduler() + .getRMContainer(containerId2)); + + // wait for app to start a new attempt. + rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + // assert this is a new AM. + ApplicationAttemptId newAttemptId = + app1.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId())); + + // launch the new AM + MockAM am2 = MockRM.launchAMWhenAsyncSchedulingEnabled(app1, rm2); + RegisterApplicationMasterResponse registerResponse = + am2.registerAppAttempt(); + + // container2 is recovered from previous attempt + Assert.assertEquals(1, + registerResponse.getContainersFromPreviousAttempts().size()); + Assert.assertEquals("container 2", containerId2, + registerResponse.getContainersFromPreviousAttempts().get(0).getId()); + + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + //NM2 is back + nm2.setResourceTrackerService(rm2.getResourceTrackerService()); + NMContainerStatus container3Status = + TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + nm2.registerNode(Lists.newArrayList(container3Status), null); + + nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3, + ContainerState.RUNNING); + rm2.waitForState(nm2, containerId3, RMContainerState.RUNNING); + Assert.assertNotNull(rm2.getResourceScheduler() + .getRMContainer(containerId3)); + + List containersFromPreviousAttempts = new ArrayList<>(); + GenericTestUtils.waitFor(() -> { + try { + AllocateResponse allocateResponse = am2.doHeartbeat(); + if (allocateResponse.getContainersFromPreviousAttempts().size() > 0){ + containersFromPreviousAttempts.addAll( + allocateResponse.getContainersFromPreviousAttempts()); + Assert.assertEquals("new containers should not be allocated", + 0, allocateResponse.getAllocatedContainers().size()); + return true; + } + } catch (Exception e) { + Throwables.propagate(e); + } + return false; + }, 2000, 200000); + Assert.assertEquals("container 3", containerId3, + containersFromPreviousAttempts.get(0).getId()); + rm2.stop(); + rm1.stop(); + } }