YARN-6168. Restarted RM may not inform AM about all existing containers. Contributed by Chandni Singh

This commit is contained in:
Jian He 2017-11-27 09:55:08 -08:00
parent 3cd75845da
commit fedabcad42
10 changed files with 310 additions and 7 deletions

View File

@ -372,6 +372,44 @@ public abstract class AllocateResponse {
public void setUpdateErrors(List<UpdateContainerError> updateErrors) {
}
/**
* Get the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempts which
* have not been reported to the Application Master yet.
* <br/>
* These containers were recovered by the RM after the application master
* had already registered. This may happen after RM restart when some NMs get
* delayed in connecting to the RM and reporting the active containers.
* Since they were not reported in the registration
* response, they are reported in the response to the AM heartbeat.
*
* @return the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempts.
*/
@Public
@Unstable
public abstract List<Container> getContainersFromPreviousAttempts();
/**
* Set the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempts which have
* not been reported to the Application Master yet.
* <br/>
* These containers were recovered by the RM after the application master
* had already registered. This may happen after RM restart when some NMs get
* delayed in connecting to the RM and reporting the active containers.
* Since they were not reported in the registration
* response, they are reported in the response to the AM heartbeat.
*
* @param containersFromPreviousAttempt
* the list of running containers as viewed by
* <code>ResourceManager</code> from previous application attempts.
*/
@Private
@Unstable
public abstract void setContainersFromPreviousAttempts(
List<Container> containersFromPreviousAttempt);
@Private
@Unstable
public static AllocateResponseBuilder newBuilder() {
@ -589,6 +627,22 @@ public abstract class AllocateResponse {
return this;
}
/**
* Set the <code>containersFromPreviousAttempt</code> of the response.
* @see AllocateResponse#setContainersFromPreviousAttempts(List)
* @param containersFromPreviousAttempt
* <code>containersFromPreviousAttempt</code> of the response
* @return {@link AllocateResponseBuilder}
*/
@Private
@Unstable
public AllocateResponseBuilder containersFromPreviousAttempt(
List<Container> containersFromPreviousAttempt) {
allocateResponse.setContainersFromPreviousAttempts(
containersFromPreviousAttempt);
return this;
}
/**
* Return generated {@link AllocateResponse} object.
* @return {@link AllocateResponse}

View File

@ -117,6 +117,7 @@ message AllocateResponseProto {
optional CollectorInfoProto collector_info = 14;
repeated UpdateContainerErrorProto update_errors = 15;
repeated UpdatedContainerProto updated_containers = 16;
repeated ContainerProto containers_from_previous_attempts = 17;
}
enum SchedulerResourceTypes {

View File

@ -74,6 +74,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
Resource limit;
private List<Container> allocatedContainers = null;
private List<Container> containersFromPreviousAttempts = null;
private List<NMToken> nmTokens = null;
private List<ContainerStatus> completedContainersStatuses = null;
private List<UpdatedContainer> updatedContainers = null;
@ -172,6 +173,12 @@ public class AllocateResponsePBImpl extends AllocateResponse {
if (this.appPriority != null) {
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
}
if (this.containersFromPreviousAttempts != null) {
builder.clearContainersFromPreviousAttempts();
Iterable<ContainerProto> iterable =
getContainerProtoIterable(this.containersFromPreviousAttempts);
builder.addAllContainersFromPreviousAttempts(iterable);
}
}
private synchronized void mergeLocalToProto() {
@ -447,6 +454,23 @@ public class AllocateResponsePBImpl extends AllocateResponse {
this.appPriority = priority;
}
@Override
public synchronized List<Container> getContainersFromPreviousAttempts() {
initContainersFromPreviousAttemptsList();
return this.containersFromPreviousAttempts;
}
@Override
public synchronized void setContainersFromPreviousAttempts(
final List<Container> containers) {
if (containers == null) {
return;
}
initContainersFromPreviousAttemptsList();
containersFromPreviousAttempts.clear();
containersFromPreviousAttempts.addAll(containers);
}
private synchronized void initLocalUpdatedContainerList() {
if (this.updatedContainers != null) {
return;
@ -491,6 +515,19 @@ public class AllocateResponsePBImpl extends AllocateResponse {
}
}
private synchronized void initContainersFromPreviousAttemptsList() {
if (this.containersFromPreviousAttempts != null) {
return;
}
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersFromPreviousAttemptsList();
containersFromPreviousAttempts = new ArrayList<>();
for (ContainerProto c : list) {
containersFromPreviousAttempts.add(convertFromProtoFormat(c));
}
}
private synchronized void initLocalNewNMTokenList() {
if (nmTokens != null) {
return;

View File

@ -326,6 +326,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
// Set application priority
response.setApplicationPriority(app
.getApplicationPriority());
response.setContainersFromPreviousAttempts(
allocation.getPreviousAttemptContainers());
}
private void handleNodeUpdates(RMApp app, AllocateResponse allocateResponse) {

View File

@ -245,8 +245,8 @@ public abstract class AbstractYarnScheduler
if (app == null) {
return containerList;
}
Collection<RMContainer> liveContainers =
app.getCurrentAppAttempt().getLiveContainers();
Collection<RMContainer> liveContainers = app.getCurrentAppAttempt()
.pullContainersToTransfer();
ContainerId amContainerId = null;
// For UAM, amContainer would be null
if (rmContext.getRMApps().get(appId).getCurrentAppAttempt()

View File

@ -38,6 +38,7 @@ public class Allocation {
final List<Container> decreasedContainers;
final List<Container> promotedContainers;
final List<Container> demotedContainers;
private final List<Container> previousAttemptContainers;
private Resource resourceLimit;
@ -52,7 +53,7 @@ public class Allocation {
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, nmTokens, null, null, null, null);
fungibleResources, nmTokens, null, null, null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
@ -61,14 +62,15 @@ public class Allocation {
List<Container> increasedContainers, List<Container> decreasedContainer) {
this(containers, resourceLimit,strictContainers, fungibleContainers,
fungibleResources, nmTokens, increasedContainers, decreasedContainer,
null, null);
null, null, null);
}
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
List<Container> increasedContainers, List<Container> decreasedContainer,
List<Container> promotedContainers, List<Container> demotedContainer) {
List<Container> promotedContainers, List<Container> demotedContainer,
List<Container> previousAttemptContainers) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
@ -79,6 +81,7 @@ public class Allocation {
this.decreasedContainers = decreasedContainer;
this.promotedContainers = promotedContainers;
this.demotedContainers = demotedContainer;
this.previousAttemptContainers = previousAttemptContainers;
}
public List<Container> getContainers() {
@ -121,6 +124,10 @@ public class Allocation {
return demotedContainers;
}
public List<Container> getPreviousAttemptContainers() {
return previousAttemptContainers;
}
@VisibleForTesting
public void setResourceLimit(Resource resource) {
this.resourceLimit = resource;

View File

@ -145,6 +145,11 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
protected List<UpdateContainerError> updateContainerErrors = new ArrayList<>();
//Keeps track of recovered containers from previous attempt which haven't
//been reported to the AM.
private List<Container> recoveredPreviousAttemptContainers =
new ArrayList<>();
// This pendingRelease is used in work-preserving recovery scenario to keep
// track of the AM's outstanding release requests. RM on recovery could
// receive the release request form AM before it receives the container status
@ -361,6 +366,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
ContainerId id, RMContainer rmContainer) {
try {
writeLock.lock();
if (!getApplicationAttemptId().equals(
rmContainer.getApplicationAttemptId()) &&
!liveContainers.containsKey(id)) {
LOG.info("recovered container " + id +
" from previous attempt " + rmContainer.getApplicationAttemptId());
recoveredPreviousAttemptContainers.add(rmContainer.getContainer());
}
liveContainers.put(id, rmContainer);
if (rmContainer.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
this.attemptOpportunisticResourceUsage.incUsed(
@ -714,6 +726,42 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
}
}
/**
* Called when AM registers. These containers are reported to the AM in the
* <code>
* RegisterApplicationMasterResponse#containersFromPreviousAttempts
* </code>.
*/
List<RMContainer> pullContainersToTransfer() {
try {
writeLock.lock();
recoveredPreviousAttemptContainers.clear();
return new ArrayList<>(liveContainers.values());
} finally {
writeLock.unlock();
}
}
/**
* Called when AM heartbeats. These containers were recovered by the RM after
* the AM had registered. They are reported to the AM in the
* <code>AllocateResponse#containersFromPreviousAttempts</code>.
*/
public List<Container> pullPreviousAttemptContainers() {
try {
writeLock.lock();
if (recoveredPreviousAttemptContainers.isEmpty()) {
return null;
}
List<Container> returnContainerList = new ArrayList<>
(recoveredPreviousAttemptContainers);
recoveredPreviousAttemptContainers.clear();
return returnContainerList;
} finally {
writeLock.unlock();
}
}
// Create container token and update NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.

View File

@ -702,6 +702,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
ResourceRequest rr = ResourceRequest.newBuilder()
.priority(Priority.UNDEFINED).resourceName(ResourceRequest.ANY)
.capability(minimumAllocation).numContainers(numCont).build();
List<Container> previousAttemptContainers =
pullPreviousAttemptContainers();
List<Container> newlyAllocatedContainers = pullNewlyAllocatedContainers();
List<Container> newlyIncreasedContainers = pullNewlyIncreasedContainers();
List<Container> newlyDecreasedContainers = pullNewlyDecreasedContainers();
@ -713,7 +715,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
return new Allocation(newlyAllocatedContainers, headroom, null,
currentContPreemption, Collections.singletonList(rr), updatedNMTokens,
newlyIncreasedContainers, newlyDecreasedContainers,
newlyPromotedContainers, newlyDemotedContainers);
newlyPromotedContainers, newlyDemotedContainers,
previousAttemptContainers);
} finally {
writeLock.unlock();
}

View File

@ -862,7 +862,8 @@ public class FairScheduler extends
preemptionContainerIds, null, null,
application.pullUpdatedNMTokens(), null, null,
application.pullNewlyPromotedContainers(),
application.pullNewlyDemotedContainers());
application.pullNewlyDemotedContainers(),
application.pullPreviousAttemptContainers());
}
@Override

View File

@ -24,6 +24,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@ -42,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockMemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -53,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
@ -993,4 +998,148 @@ public class TestAMRestart {
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
rm1.stop();
}
// Test to verify that the containers of previous attempt are returned in
// the RM response to the heartbeat of AM if these containers were not
// recovered by the time AM registered.
//
// 1. App is started with 2 containers running on 2 different nodes-
// container 2 on the NM1 node and container 3 on the NM2 node.
// 2. Fail the AM of the application.
// 3. Simulate RM restart.
// 4. NM1 connects to the restarted RM immediately. It sends the RM the status
// of container 2.
// 5. 2nd attempt of the app is launched and the app master registers with RM.
// 6. Verify that app master receives container 2 in the RM response to
// register request.
// 7. NM2 connects to the RM after a delay. It sends the RM the status of
// container 3.
// 8. Verify that the app master receives container 3 in the RM response to
// its heartbeat.
@Test(timeout = 200000)
public void testContainersFromPreviousAttemptsWithRMRestart()
throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
MockRM rm1 = new MockRM(conf);
MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
rm1.start();
YarnScheduler scheduler = rm1.getResourceScheduler();
MockNM nm1 = new MockNM("127.0.0.1:1234", 10240,
rm1.getResourceTrackerService());
nm1.registerNode();
MockNM nm2 = new MockNM("127.0.0.1:2351", 4089,
rm1.getResourceTrackerService());
nm2.registerNode();
RMApp app1 = rm1.submitApp(200, "name", "user",
new HashMap<>(), false, "default", -1,
null, "MAPREDUCE", false, true);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
allocateContainers(nm1, am1, 1);
allocateContainers(nm2, am1, 1);
// container 2 launched and running on node 1
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
ContainerId containerId2 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
// container 3 launched and running node 2
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3,
ContainerState.RUNNING);
ContainerId containerId3 =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
rm1.waitForState(nm2, containerId3, RMContainerState.RUNNING);
// fail the AM normally
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1,
ContainerState.COMPLETE);
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(
(AbstractYarnScheduler)scheduler, am1.getApplicationAttemptId());
// restart rm
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus container2Status =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
nm1.registerNode(Lists.newArrayList(container2Status), null);
// Wait for RM to settle down on recovering containers;
Thread.sleep(3000);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
ContainerState.RUNNING);
rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING);
Assert.assertNotNull(rm2.getResourceScheduler()
.getRMContainer(containerId2));
// wait for app to start a new attempt.
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
// assert this is a new AM.
ApplicationAttemptId newAttemptId =
app1.getCurrentAppAttempt().getAppAttemptId();
Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
// launch the new AM
MockAM am2 = MockRM.launchAMWhenAsyncSchedulingEnabled(app1, rm2);
RegisterApplicationMasterResponse registerResponse =
am2.registerAppAttempt();
// container2 is recovered from previous attempt
Assert.assertEquals(1,
registerResponse.getContainersFromPreviousAttempts().size());
Assert.assertEquals("container 2", containerId2,
registerResponse.getContainersFromPreviousAttempts().get(0).getId());
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
//NM2 is back
nm2.setResourceTrackerService(rm2.getResourceTrackerService());
NMContainerStatus container3Status =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.RUNNING);
nm2.registerNode(Lists.newArrayList(container3Status), null);
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3,
ContainerState.RUNNING);
rm2.waitForState(nm2, containerId3, RMContainerState.RUNNING);
Assert.assertNotNull(rm2.getResourceScheduler()
.getRMContainer(containerId3));
List<Container> containersFromPreviousAttempts = new ArrayList<>();
GenericTestUtils.waitFor(() -> {
try {
AllocateResponse allocateResponse = am2.doHeartbeat();
if (allocateResponse.getContainersFromPreviousAttempts().size() > 0){
containersFromPreviousAttempts.addAll(
allocateResponse.getContainersFromPreviousAttempts());
Assert.assertEquals("new containers should not be allocated",
0, allocateResponse.getAllocatedContainers().size());
return true;
}
} catch (Exception e) {
Throwables.propagate(e);
}
return false;
}, 2000, 200000);
Assert.assertEquals("container 3", containerId3,
containersFromPreviousAttempts.get(0).getId());
rm2.stop();
rm1.stop();
}
}