YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He.

(cherry picked from commit 52bbe0f11b)
This commit is contained in:
Zhijie Shen 2014-10-01 15:38:11 -07:00
parent 5d2f325483
commit 8531f93d2d
11 changed files with 108 additions and 86 deletions

View File

@ -452,6 +452,9 @@ Release 2.6.0 - UNRELEASED
YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
(Zhijie Shen via jianhe)
YARN-2630. Prevented previous AM container status from being acquired by the
current restarted AM. (Jian He via zjshen)
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES

View File

@ -668,6 +668,7 @@ public class ApplicationMaster {
+ ", completed=" + numCompletedContainers.get() + ", allocated="
+ numAllocatedContainers.get() + ", failed="
+ numFailedContainers.get();
LOG.info(appMessage);
success = false;
}
try {

View File

@ -30,7 +30,7 @@ public interface NodeHeartbeatResponse {
NodeAction getNodeAction();
List<ContainerId> getContainersToCleanup();
List<ContainerId> getFinishedContainersPulledByAM();
List<ContainerId> getContainersToBeRemovedFromNM();
List<ApplicationId> getApplicationsToCleanup();
@ -45,9 +45,10 @@ public interface NodeHeartbeatResponse {
void addAllContainersToCleanup(List<ContainerId> containers);
// This tells NM to remove finished containers only after the AM
// has actually received it in a previous allocate response
void addFinishedContainersPulledByAM(List<ContainerId> containers);
// This tells NM to remove finished containers from its context. Currently, NM
// will remove finished containers from its context only after AM has actually
// received the finished containers in a previous allocate response
void addContainersToBeRemovedFromNM(List<ContainerId> containers);
void addAllApplicationsToCleanup(List<ApplicationId> applications);

View File

@ -40,13 +40,14 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
public class NodeHeartbeatResponsePBImpl extends
ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
NodeHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
private List<ContainerId> containersToCleanup = null;
private List<ContainerId> finishedContainersPulledByAM = null;
private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -74,8 +75,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto();
}
if (this.finishedContainersPulledByAM != null) {
addFinishedContainersPulledByAMToProto();
if (this.containersToBeRemovedFromNM != null) {
addContainersToBeRemovedFromNMToProto();
}
if (this.containerTokenMasterKey != null) {
builder.setContainerTokenMasterKey(
@ -204,9 +205,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
@Override
public List<ContainerId> getFinishedContainersPulledByAM() {
initFinishedContainersPulledByAM();
return this.finishedContainersPulledByAM;
public List<ContainerId> getContainersToBeRemovedFromNM() {
initContainersToBeRemovedFromNM();
return this.containersToBeRemovedFromNM;
}
private void initContainersToCleanup() {
@ -222,16 +223,16 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
}
private void initFinishedContainersPulledByAM() {
if (this.finishedContainersPulledByAM != null) {
private void initContainersToBeRemovedFromNM() {
if (this.containersToBeRemovedFromNM != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
List<ContainerIdProto> list = p.getContainersToBeRemovedFromNmList();
this.containersToBeRemovedFromNM = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
this.containersToBeRemovedFromNM.add(convertFromProtoFormat(c));
}
}
@ -245,12 +246,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
@Override
public void addFinishedContainersPulledByAM(
final List<ContainerId> finishedContainersPulledByAM) {
if (finishedContainersPulledByAM == null)
public void
addContainersToBeRemovedFromNM(final List<ContainerId> containers) {
if (containers == null)
return;
initFinishedContainersPulledByAM();
this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
initContainersToBeRemovedFromNM();
this.containersToBeRemovedFromNM.addAll(containers);
}
private void addContainersToCleanupToProto() {
@ -288,10 +289,10 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
builder.addAllContainersToCleanup(iterable);
}
private void addFinishedContainersPulledByAMToProto() {
private void addContainersToBeRemovedFromNMToProto() {
maybeInitBuilder();
builder.clearFinishedContainersPulledByAm();
if (finishedContainersPulledByAM == null)
builder.clearContainersToBeRemovedFromNm();
if (containersToBeRemovedFromNM == null)
return;
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@ -299,7 +300,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
Iterator<ContainerId> iter = containersToBeRemovedFromNM.iterator();
@Override
public boolean hasNext() {
@ -320,7 +321,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
}
};
builder.addAllFinishedContainersPulledByAm(iterable);
builder.addAllContainersToBeRemovedFromNm(iterable);
}
@Override

View File

@ -58,7 +58,7 @@ message NodeHeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 6;
optional int64 nextHeartBeatInterval = 7;
optional string diagnostics_message = 8;
repeated ContainerIdProto finished_containers_pulled_by_am = 9;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
}
message NMContainerStatusProto {

View File

@ -558,7 +558,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// when NM re-registers with RM.
// Only remove the cleanedup containers that are acked
removeCompletedContainersFromContext(response
.getFinishedContainersPulledByAM());
.getContainersToBeRemovedFromNM());
lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response

View File

@ -692,7 +692,7 @@ public class TestNodeStatusUpdater {
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
heartBeatNodeAction, null, null, null, null, 1000L);
nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
return nhResponse;
}
}

View File

@ -687,20 +687,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now
for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM
.put(nodeId, new ArrayList<ContainerStatus>());
List<ContainerId> containerIdList = new ArrayList<ContainerId>
(currentSentContainers.size());
for (ContainerStatus containerStatus:currentSentContainers) {
containerIdList.add(containerStatus.getContainerId());
}
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
nodeId, containerIdList));
}
sendFinishedContainersToNM();
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
@ -1592,14 +1579,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer != null
&& appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
// Remember the follow up transition and save the final attempt state.
appAttempt.rememberTargetTransitionsAndStoreState(event,
@ -1607,10 +1592,46 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
return RMAppAttemptState.FINAL_SAVING;
}
// Add all finished containers so that they can be acked to NM
addJustFinishedContainer(appAttempt, containerFinishedEvent);
return this.currentState;
}
}
// Ack NM to remove finished containers from context.
private void sendFinishedContainersToNM() {
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM.put(nodeId,
new ArrayList<ContainerStatus>());
List<ContainerId> containerIdList =
new ArrayList<ContainerId>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) {
containerIdList.add(containerStatus.getContainerId());
}
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList));
}
}
// Add am container to the list so that am container instance will be
// removed from NMContext.
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
NodeId nodeId = containerFinishedEvent.getNodeId();
finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<ContainerStatus>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(
containerFinishedEvent.getContainerStatus());
if (!appAttempt.getSubmissionContext()
.getKeepContainersAcrossApplicationAttempts()) {
appAttempt.sendFinishedContainersToNM();
}
}
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
@ -1661,16 +1682,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// Is this container the ApplicationMaster container?
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
new FinalTransition(RMAppAttemptState.FINISHED).transition(
appAttempt, containerFinishedEvent);
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHED;
}
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
return RMAppAttemptState.FINISHING;
}
@ -1686,14 +1707,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
// If this is the AM container, it means the AM container is finished,
// but we are not yet acknowledged that the final state has been saved.
// Thus, we still return FINAL_SAVING state here.
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|| appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
// ignore Container_Finished Event if we were supposed to reach
@ -1708,6 +1728,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
return;
}
// Add all finished containers so that they can be acked to NM.
addJustFinishedContainer(appAttempt, containerFinishedEvent);
}
}

View File

@ -112,8 +112,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator());
/* set of containers that were notified to AM about their completion */
private final Set<ContainerId> finishedContainersPulledByAM =
/*
* set of containers to notify NM to remove them from its context. Currently,
* this includes containers that were notified to AM about their completion
*/
private final Set<ContainerId> containersToBeRemovedFromNM =
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */
@ -157,7 +160,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
@ -174,7 +177,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
new AddContainersToBeRemovedFromNMTransition())
//Transitions from LOST state
.addTransition(NodeState.LOST, NodeState.LOST,
@ -182,7 +185,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.LOST, NodeState.LOST,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
new AddContainersToBeRemovedFromNMTransition())
//Transitions from UNHEALTHY state
.addTransition(NodeState.UNHEALTHY,
@ -208,7 +211,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new FinishedContainersPulledByAMTransition())
new AddContainersToBeRemovedFromNMTransition())
// create the topology tables
.installTopology();
@ -382,11 +385,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
response.addAllContainersToCleanup(
new ArrayList<ContainerId>(this.containersToClean));
response.addAllApplicationsToCleanup(this.finishedApplications);
response.addFinishedContainersPulledByAM(
new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
response.addContainersToBeRemovedFromNM(
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
this.containersToClean.clear();
this.finishedApplications.clear();
this.finishedContainersPulledByAM.clear();
this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
}
@ -659,12 +662,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
}
public static class FinishedContainersPulledByAMTransition implements
public static class AddContainersToBeRemovedFromNMTransition implements
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.finishedContainersPulledByAM.addAll(((
rmNode.containersToBeRemovedFromNM.addAll(((
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
}
}

View File

@ -98,9 +98,6 @@ public class TestAMRestart {
Thread.sleep(200);
}
ContainerId amContainerId = ContainerId.newInstance(am1
.getApplicationAttemptId(), 1);
// launch the 2nd container, for testing running container transferred.
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
ContainerId containerId2 =
@ -199,15 +196,11 @@ public class TestAMRestart {
// completed containerId4 is also transferred to the new attempt.
RMAppAttempt newAttempt =
app1.getRMAppAttempt(am2.getApplicationAttemptId());
// 4 containers finished, acquired/allocated/reserved/completed + AM
// container.
waitForContainersToFinish(5, newAttempt);
// 4 containers finished, acquired/allocated/reserved/completed.
waitForContainersToFinish(4, newAttempt);
boolean container3Exists = false, container4Exists = false, container5Exists =
false, container6Exists = false, amContainerExists = false;
false, container6Exists = false;
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
if(status.getContainerId().equals(amContainerId)) {
amContainerExists = true;
}
if(status.getContainerId().equals(containerId3)) {
// containerId3 is the container ran by previous attempt but finished by the
// new attempt.
@ -227,11 +220,8 @@ public class TestAMRestart {
container6Exists = true;
}
}
Assert.assertTrue(amContainerExists);
Assert.assertTrue(container3Exists);
Assert.assertTrue(container4Exists);
Assert.assertTrue(container5Exists);
Assert.assertTrue(container6Exists);
Assert.assertTrue(container3Exists && container4Exists && container5Exists
&& container6Exists);
// New SchedulerApplicationAttempt also has the containers info.
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
@ -250,14 +240,14 @@ public class TestAMRestart {
// all 4 normal containers finished.
System.out.println("New attempt's just finished containers: "
+ newAttempt.getJustFinishedContainers());
waitForContainersToFinish(6, newAttempt);
waitForContainersToFinish(5, newAttempt);
rm1.stop();
}
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
throws InterruptedException {
int count = 0;
while (attempt.getJustFinishedContainers().size() < expectedNum
while (attempt.getJustFinishedContainers().size() != expectedNum
&& count < 500) {
Thread.sleep(100);
count++;

View File

@ -965,7 +965,7 @@ public class TestRMAppAttemptTransitions {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
applicationAttempt.getAppAttemptState());
assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@ -1003,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
sendAttemptUpdateSavedEvent(applicationAttempt);
assertEquals(RMAppAttemptState.KILLED,
applicationAttempt.getAppAttemptState());
assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
@ -1192,7 +1192,7 @@ public class TestRMAppAttemptTransitions {
BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, "", 0), anyNodeId));
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 1, false);
diagnostics, 0, false);
}
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
@ -1225,7 +1225,7 @@ public class TestRMAppAttemptTransitions {
// send attempt_saved
sendAttemptUpdateSavedEvent(applicationAttempt);
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
diagnostics, 1, false);
diagnostics, 0, false);
}
// While attempt is at FINAL_SAVING, Expire event may come before
@ -1381,13 +1381,13 @@ public class TestRMAppAttemptTransitions {
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
// failed attempt captured the container finished event.
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
ContainerStatus cs2 =
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
ContainerState.COMPLETE, "", 0);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
appAttemptId, cs2, anyNodeId));
assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
boolean found = false;
for (ContainerStatus containerStatus:applicationAttempt
.getJustFinishedContainers()) {