YARN-2630. Prevented previous AM container status from being acquired by the current restarted AM. Contributed by Jian He.
(cherry picked from commit 52bbe0f11b
)
This commit is contained in:
parent
1725d56132
commit
4b50e23271
|
@ -470,6 +470,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
|
YARN-2602. Fixed possible NPE in ApplicationHistoryManagerOnTimelineStore.
|
||||||
(Zhijie Shen via jianhe)
|
(Zhijie Shen via jianhe)
|
||||||
|
|
||||||
|
YARN-2630. Prevented previous AM container status from being acquired by the
|
||||||
|
current restarted AM. (Jian He via zjshen)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -668,6 +668,7 @@ public class ApplicationMaster {
|
||||||
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
+ ", completed=" + numCompletedContainers.get() + ", allocated="
|
||||||
+ numAllocatedContainers.get() + ", failed="
|
+ numAllocatedContainers.get() + ", failed="
|
||||||
+ numFailedContainers.get();
|
+ numFailedContainers.get();
|
||||||
|
LOG.info(appMessage);
|
||||||
success = false;
|
success = false;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -30,7 +30,7 @@ public interface NodeHeartbeatResponse {
|
||||||
NodeAction getNodeAction();
|
NodeAction getNodeAction();
|
||||||
|
|
||||||
List<ContainerId> getContainersToCleanup();
|
List<ContainerId> getContainersToCleanup();
|
||||||
List<ContainerId> getFinishedContainersPulledByAM();
|
List<ContainerId> getContainersToBeRemovedFromNM();
|
||||||
|
|
||||||
List<ApplicationId> getApplicationsToCleanup();
|
List<ApplicationId> getApplicationsToCleanup();
|
||||||
|
|
||||||
|
@ -45,9 +45,10 @@ public interface NodeHeartbeatResponse {
|
||||||
|
|
||||||
void addAllContainersToCleanup(List<ContainerId> containers);
|
void addAllContainersToCleanup(List<ContainerId> containers);
|
||||||
|
|
||||||
// This tells NM to remove finished containers only after the AM
|
// This tells NM to remove finished containers from its context. Currently, NM
|
||||||
// has actually received it in a previous allocate response
|
// will remove finished containers from its context only after AM has actually
|
||||||
void addFinishedContainersPulledByAM(List<ContainerId> containers);
|
// received the finished containers in a previous allocate response
|
||||||
|
void addContainersToBeRemovedFromNM(List<ContainerId> containers);
|
||||||
|
|
||||||
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
||||||
|
|
||||||
|
|
|
@ -40,13 +40,14 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
|
public class NodeHeartbeatResponsePBImpl extends
|
||||||
|
ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
|
||||||
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
|
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
|
||||||
NodeHeartbeatResponseProto.Builder builder = null;
|
NodeHeartbeatResponseProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
private List<ContainerId> containersToCleanup = null;
|
private List<ContainerId> containersToCleanup = null;
|
||||||
private List<ContainerId> finishedContainersPulledByAM = null;
|
private List<ContainerId> containersToBeRemovedFromNM = null;
|
||||||
private List<ApplicationId> applicationsToCleanup = null;
|
private List<ApplicationId> applicationsToCleanup = null;
|
||||||
private MasterKey containerTokenMasterKey = null;
|
private MasterKey containerTokenMasterKey = null;
|
||||||
private MasterKey nmTokenMasterKey = null;
|
private MasterKey nmTokenMasterKey = null;
|
||||||
|
@ -74,8 +75,8 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
if (this.applicationsToCleanup != null) {
|
if (this.applicationsToCleanup != null) {
|
||||||
addApplicationsToCleanupToProto();
|
addApplicationsToCleanupToProto();
|
||||||
}
|
}
|
||||||
if (this.finishedContainersPulledByAM != null) {
|
if (this.containersToBeRemovedFromNM != null) {
|
||||||
addFinishedContainersPulledByAMToProto();
|
addContainersToBeRemovedFromNMToProto();
|
||||||
}
|
}
|
||||||
if (this.containerTokenMasterKey != null) {
|
if (this.containerTokenMasterKey != null) {
|
||||||
builder.setContainerTokenMasterKey(
|
builder.setContainerTokenMasterKey(
|
||||||
|
@ -204,9 +205,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerId> getFinishedContainersPulledByAM() {
|
public List<ContainerId> getContainersToBeRemovedFromNM() {
|
||||||
initFinishedContainersPulledByAM();
|
initContainersToBeRemovedFromNM();
|
||||||
return this.finishedContainersPulledByAM;
|
return this.containersToBeRemovedFromNM;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initContainersToCleanup() {
|
private void initContainersToCleanup() {
|
||||||
|
@ -222,16 +223,16 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initFinishedContainersPulledByAM() {
|
private void initContainersToBeRemovedFromNM() {
|
||||||
if (this.finishedContainersPulledByAM != null) {
|
if (this.containersToBeRemovedFromNM != null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
List<ContainerIdProto> list = p.getFinishedContainersPulledByAmList();
|
List<ContainerIdProto> list = p.getContainersToBeRemovedFromNmList();
|
||||||
this.finishedContainersPulledByAM = new ArrayList<ContainerId>();
|
this.containersToBeRemovedFromNM = new ArrayList<ContainerId>();
|
||||||
|
|
||||||
for (ContainerIdProto c : list) {
|
for (ContainerIdProto c : list) {
|
||||||
this.finishedContainersPulledByAM.add(convertFromProtoFormat(c));
|
this.containersToBeRemovedFromNM.add(convertFromProtoFormat(c));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -245,12 +246,12 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addFinishedContainersPulledByAM(
|
public void
|
||||||
final List<ContainerId> finishedContainersPulledByAM) {
|
addContainersToBeRemovedFromNM(final List<ContainerId> containers) {
|
||||||
if (finishedContainersPulledByAM == null)
|
if (containers == null)
|
||||||
return;
|
return;
|
||||||
initFinishedContainersPulledByAM();
|
initContainersToBeRemovedFromNM();
|
||||||
this.finishedContainersPulledByAM.addAll(finishedContainersPulledByAM);
|
this.containersToBeRemovedFromNM.addAll(containers);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addContainersToCleanupToProto() {
|
private void addContainersToCleanupToProto() {
|
||||||
|
@ -288,10 +289,10 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
builder.addAllContainersToCleanup(iterable);
|
builder.addAllContainersToCleanup(iterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addFinishedContainersPulledByAMToProto() {
|
private void addContainersToBeRemovedFromNMToProto() {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.clearFinishedContainersPulledByAm();
|
builder.clearContainersToBeRemovedFromNm();
|
||||||
if (finishedContainersPulledByAM == null)
|
if (containersToBeRemovedFromNM == null)
|
||||||
return;
|
return;
|
||||||
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
|
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
|
||||||
|
|
||||||
|
@ -299,7 +300,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
public Iterator<ContainerIdProto> iterator() {
|
public Iterator<ContainerIdProto> iterator() {
|
||||||
return new Iterator<ContainerIdProto>() {
|
return new Iterator<ContainerIdProto>() {
|
||||||
|
|
||||||
Iterator<ContainerId> iter = finishedContainersPulledByAM.iterator();
|
Iterator<ContainerId> iter = containersToBeRemovedFromNM.iterator();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
|
@ -320,7 +321,7 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
||||||
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
builder.addAllFinishedContainersPulledByAm(iterable);
|
builder.addAllContainersToBeRemovedFromNm(iterable);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -58,7 +58,7 @@ message NodeHeartbeatResponseProto {
|
||||||
repeated ApplicationIdProto applications_to_cleanup = 6;
|
repeated ApplicationIdProto applications_to_cleanup = 6;
|
||||||
optional int64 nextHeartBeatInterval = 7;
|
optional int64 nextHeartBeatInterval = 7;
|
||||||
optional string diagnostics_message = 8;
|
optional string diagnostics_message = 8;
|
||||||
repeated ContainerIdProto finished_containers_pulled_by_am = 9;
|
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NMContainerStatusProto {
|
message NMContainerStatusProto {
|
||||||
|
|
|
@ -558,7 +558,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
// when NM re-registers with RM.
|
// when NM re-registers with RM.
|
||||||
// Only remove the cleanedup containers that are acked
|
// Only remove the cleanedup containers that are acked
|
||||||
removeCompletedContainersFromContext(response
|
removeCompletedContainersFromContext(response
|
||||||
.getFinishedContainersPulledByAM());
|
.getContainersToBeRemovedFromNM());
|
||||||
|
|
||||||
lastHeartBeatID = response.getResponseId();
|
lastHeartBeatID = response.getResponseId();
|
||||||
List<ContainerId> containersToCleanup = response
|
List<ContainerId> containersToCleanup = response
|
||||||
|
|
|
@ -692,7 +692,7 @@ public class TestNodeStatusUpdater {
|
||||||
NodeHeartbeatResponse nhResponse =
|
NodeHeartbeatResponse nhResponse =
|
||||||
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
|
||||||
heartBeatNodeAction, null, null, null, null, 1000L);
|
heartBeatNodeAction, null, null, null, null, 1000L);
|
||||||
nhResponse.addFinishedContainersPulledByAM(finishedContainersPulledByAM);
|
nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
|
||||||
return nhResponse;
|
return nhResponse;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -687,20 +687,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
|
|
||||||
// A new allocate means the AM received the previously sent
|
// A new allocate means the AM received the previously sent
|
||||||
// finishedContainers. We can ack this to NM now
|
// finishedContainers. We can ack this to NM now
|
||||||
for (NodeId nodeId:finishedContainersSentToAM.keySet()) {
|
sendFinishedContainersToNM();
|
||||||
|
|
||||||
// Clear and get current values
|
|
||||||
List<ContainerStatus> currentSentContainers =
|
|
||||||
finishedContainersSentToAM
|
|
||||||
.put(nodeId, new ArrayList<ContainerStatus>());
|
|
||||||
List<ContainerId> containerIdList = new ArrayList<ContainerId>
|
|
||||||
(currentSentContainers.size());
|
|
||||||
for (ContainerStatus containerStatus:currentSentContainers) {
|
|
||||||
containerIdList.add(containerStatus.getContainerId());
|
|
||||||
}
|
|
||||||
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(
|
|
||||||
nodeId, containerIdList));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark every containerStatus as being sent to AM though we may return
|
// Mark every containerStatus as being sent to AM though we may return
|
||||||
// only the ones that belong to the current attempt
|
// only the ones that belong to the current attempt
|
||||||
|
@ -1592,14 +1579,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
containerFinishedEvent.getContainerStatus();
|
containerFinishedEvent.getContainerStatus();
|
||||||
|
|
||||||
// Add all finished containers so that they can be acked to NM
|
|
||||||
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
|
||||||
|
|
||||||
// Is this container the AmContainer? If the finished container is same as
|
// Is this container the AmContainer? If the finished container is same as
|
||||||
// the AMContainer, AppAttempt fails
|
// the AMContainer, AppAttempt fails
|
||||||
if (appAttempt.masterContainer != null
|
if (appAttempt.masterContainer != null
|
||||||
&& appAttempt.masterContainer.getId().equals(
|
&& appAttempt.masterContainer.getId().equals(
|
||||||
containerStatus.getContainerId())) {
|
containerStatus.getContainerId())) {
|
||||||
|
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||||
|
|
||||||
// Remember the follow up transition and save the final attempt state.
|
// Remember the follow up transition and save the final attempt state.
|
||||||
appAttempt.rememberTargetTransitionsAndStoreState(event,
|
appAttempt.rememberTargetTransitionsAndStoreState(event,
|
||||||
|
@ -1607,10 +1592,46 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
return RMAppAttemptState.FINAL_SAVING;
|
return RMAppAttemptState.FINAL_SAVING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add all finished containers so that they can be acked to NM
|
||||||
|
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
||||||
return this.currentState;
|
return this.currentState;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Ack NM to remove finished containers from context.
|
||||||
|
private void sendFinishedContainersToNM() {
|
||||||
|
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
|
||||||
|
|
||||||
|
// Clear and get current values
|
||||||
|
List<ContainerStatus> currentSentContainers =
|
||||||
|
finishedContainersSentToAM.put(nodeId,
|
||||||
|
new ArrayList<ContainerStatus>());
|
||||||
|
List<ContainerId> containerIdList =
|
||||||
|
new ArrayList<ContainerId>(currentSentContainers.size());
|
||||||
|
for (ContainerStatus containerStatus : currentSentContainers) {
|
||||||
|
containerIdList.add(containerStatus.getContainerId());
|
||||||
|
}
|
||||||
|
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
|
||||||
|
containerIdList));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add am container to the list so that am container instance will be
|
||||||
|
// removed from NMContext.
|
||||||
|
private void sendAMContainerToNM(RMAppAttemptImpl appAttempt,
|
||||||
|
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||||
|
NodeId nodeId = containerFinishedEvent.getNodeId();
|
||||||
|
finishedContainersSentToAM.putIfAbsent(nodeId,
|
||||||
|
new ArrayList<ContainerStatus>());
|
||||||
|
appAttempt.finishedContainersSentToAM.get(nodeId).add(
|
||||||
|
containerFinishedEvent.getContainerStatus());
|
||||||
|
if (!appAttempt.getSubmissionContext()
|
||||||
|
.getKeepContainersAcrossApplicationAttempts()) {
|
||||||
|
appAttempt.sendFinishedContainersToNM();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
|
private static void addJustFinishedContainer(RMAppAttemptImpl appAttempt,
|
||||||
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
RMAppAttemptContainerFinishedEvent containerFinishedEvent) {
|
||||||
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
|
appAttempt.justFinishedContainers.putIfAbsent(containerFinishedEvent
|
||||||
|
@ -1661,16 +1682,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
containerFinishedEvent.getContainerStatus();
|
containerFinishedEvent.getContainerStatus();
|
||||||
|
|
||||||
// Add all finished containers so that they can be acked to NM.
|
|
||||||
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
|
||||||
|
|
||||||
// Is this container the ApplicationMaster container?
|
// Is this container the ApplicationMaster container?
|
||||||
if (appAttempt.masterContainer.getId().equals(
|
if (appAttempt.masterContainer.getId().equals(
|
||||||
containerStatus.getContainerId())) {
|
containerStatus.getContainerId())) {
|
||||||
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
new FinalTransition(RMAppAttemptState.FINISHED).transition(
|
||||||
appAttempt, containerFinishedEvent);
|
appAttempt, containerFinishedEvent);
|
||||||
|
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||||
return RMAppAttemptState.FINISHED;
|
return RMAppAttemptState.FINISHED;
|
||||||
}
|
}
|
||||||
|
// Add all finished containers so that they can be acked to NM.
|
||||||
|
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
||||||
|
|
||||||
return RMAppAttemptState.FINISHING;
|
return RMAppAttemptState.FINISHING;
|
||||||
}
|
}
|
||||||
|
@ -1686,14 +1707,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
ContainerStatus containerStatus =
|
ContainerStatus containerStatus =
|
||||||
containerFinishedEvent.getContainerStatus();
|
containerFinishedEvent.getContainerStatus();
|
||||||
|
|
||||||
// Add all finished containers so that they can be acked to NM.
|
|
||||||
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
|
||||||
|
|
||||||
// If this is the AM container, it means the AM container is finished,
|
// If this is the AM container, it means the AM container is finished,
|
||||||
// but we are not yet acknowledged that the final state has been saved.
|
// but we are not yet acknowledged that the final state has been saved.
|
||||||
// Thus, we still return FINAL_SAVING state here.
|
// Thus, we still return FINAL_SAVING state here.
|
||||||
if (appAttempt.masterContainer.getId().equals(
|
if (appAttempt.masterContainer.getId().equals(
|
||||||
containerStatus.getContainerId())) {
|
containerStatus.getContainerId())) {
|
||||||
|
appAttempt.sendAMContainerToNM(appAttempt, containerFinishedEvent);
|
||||||
|
|
||||||
if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|
if (appAttempt.targetedFinalState.equals(RMAppAttemptState.FAILED)
|
||||||
|| appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
|
|| appAttempt.targetedFinalState.equals(RMAppAttemptState.KILLED)) {
|
||||||
// ignore Container_Finished Event if we were supposed to reach
|
// ignore Container_Finished Event if we were supposed to reach
|
||||||
|
@ -1708,6 +1728,9 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||||
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
|
appAttempt.eventCausingFinalSaving), RMAppAttemptState.FINISHED);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add all finished containers so that they can be acked to NM.
|
||||||
|
addJustFinishedContainer(appAttempt, containerFinishedEvent);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -112,8 +112,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
|
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
|
||||||
new ContainerIdComparator());
|
new ContainerIdComparator());
|
||||||
|
|
||||||
/* set of containers that were notified to AM about their completion */
|
/*
|
||||||
private final Set<ContainerId> finishedContainersPulledByAM =
|
* set of containers to notify NM to remove them from its context. Currently,
|
||||||
|
* this includes containers that were notified to AM about their completion
|
||||||
|
*/
|
||||||
|
private final Set<ContainerId> containersToBeRemovedFromNM =
|
||||||
new HashSet<ContainerId>();
|
new HashSet<ContainerId>();
|
||||||
|
|
||||||
/* the list of applications that have finished and need to be purged */
|
/* the list of applications that have finished and need to be purged */
|
||||||
|
@ -157,7 +160,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
|
RMNodeEventType.CLEANUP_CONTAINER, new CleanUpContainerTransition())
|
||||||
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
||||||
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
new FinishedContainersPulledByAMTransition())
|
new AddContainersToBeRemovedFromNMTransition())
|
||||||
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
||||||
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
|
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
|
||||||
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
.addTransition(NodeState.RUNNING, NodeState.RUNNING,
|
||||||
|
@ -174,7 +177,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
new UpdateNodeResourceWhenUnusableTransition())
|
new UpdateNodeResourceWhenUnusableTransition())
|
||||||
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
|
.addTransition(NodeState.DECOMMISSIONED, NodeState.DECOMMISSIONED,
|
||||||
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
new FinishedContainersPulledByAMTransition())
|
new AddContainersToBeRemovedFromNMTransition())
|
||||||
|
|
||||||
//Transitions from LOST state
|
//Transitions from LOST state
|
||||||
.addTransition(NodeState.LOST, NodeState.LOST,
|
.addTransition(NodeState.LOST, NodeState.LOST,
|
||||||
|
@ -182,7 +185,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
new UpdateNodeResourceWhenUnusableTransition())
|
new UpdateNodeResourceWhenUnusableTransition())
|
||||||
.addTransition(NodeState.LOST, NodeState.LOST,
|
.addTransition(NodeState.LOST, NodeState.LOST,
|
||||||
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
new FinishedContainersPulledByAMTransition())
|
new AddContainersToBeRemovedFromNMTransition())
|
||||||
|
|
||||||
//Transitions from UNHEALTHY state
|
//Transitions from UNHEALTHY state
|
||||||
.addTransition(NodeState.UNHEALTHY,
|
.addTransition(NodeState.UNHEALTHY,
|
||||||
|
@ -208,7 +211,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
|
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenUnusableTransition())
|
||||||
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
|
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
|
||||||
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
|
||||||
new FinishedContainersPulledByAMTransition())
|
new AddContainersToBeRemovedFromNMTransition())
|
||||||
|
|
||||||
// create the topology tables
|
// create the topology tables
|
||||||
.installTopology();
|
.installTopology();
|
||||||
|
@ -382,11 +385,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
response.addAllContainersToCleanup(
|
response.addAllContainersToCleanup(
|
||||||
new ArrayList<ContainerId>(this.containersToClean));
|
new ArrayList<ContainerId>(this.containersToClean));
|
||||||
response.addAllApplicationsToCleanup(this.finishedApplications);
|
response.addAllApplicationsToCleanup(this.finishedApplications);
|
||||||
response.addFinishedContainersPulledByAM(
|
response.addContainersToBeRemovedFromNM(
|
||||||
new ArrayList<ContainerId>(this.finishedContainersPulledByAM));
|
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
|
||||||
this.containersToClean.clear();
|
this.containersToClean.clear();
|
||||||
this.finishedApplications.clear();
|
this.finishedApplications.clear();
|
||||||
this.finishedContainersPulledByAM.clear();
|
this.containersToBeRemovedFromNM.clear();
|
||||||
} finally {
|
} finally {
|
||||||
this.writeLock.unlock();
|
this.writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -659,12 +662,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FinishedContainersPulledByAMTransition implements
|
public static class AddContainersToBeRemovedFromNMTransition implements
|
||||||
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
rmNode.finishedContainersPulledByAM.addAll(((
|
rmNode.containersToBeRemovedFromNM.addAll(((
|
||||||
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
|
RMNodeFinishedContainersPulledByAMEvent) event).getContainers());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,9 +98,6 @@ public class TestAMRestart {
|
||||||
Thread.sleep(200);
|
Thread.sleep(200);
|
||||||
}
|
}
|
||||||
|
|
||||||
ContainerId amContainerId = ContainerId.newInstance(am1
|
|
||||||
.getApplicationAttemptId(), 1);
|
|
||||||
|
|
||||||
// launch the 2nd container, for testing running container transferred.
|
// launch the 2nd container, for testing running container transferred.
|
||||||
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2, ContainerState.RUNNING);
|
||||||
ContainerId containerId2 =
|
ContainerId containerId2 =
|
||||||
|
@ -199,15 +196,11 @@ public class TestAMRestart {
|
||||||
// completed containerId4 is also transferred to the new attempt.
|
// completed containerId4 is also transferred to the new attempt.
|
||||||
RMAppAttempt newAttempt =
|
RMAppAttempt newAttempt =
|
||||||
app1.getRMAppAttempt(am2.getApplicationAttemptId());
|
app1.getRMAppAttempt(am2.getApplicationAttemptId());
|
||||||
// 4 containers finished, acquired/allocated/reserved/completed + AM
|
// 4 containers finished, acquired/allocated/reserved/completed.
|
||||||
// container.
|
waitForContainersToFinish(4, newAttempt);
|
||||||
waitForContainersToFinish(5, newAttempt);
|
|
||||||
boolean container3Exists = false, container4Exists = false, container5Exists =
|
boolean container3Exists = false, container4Exists = false, container5Exists =
|
||||||
false, container6Exists = false, amContainerExists = false;
|
false, container6Exists = false;
|
||||||
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
|
for(ContainerStatus status : newAttempt.getJustFinishedContainers()) {
|
||||||
if(status.getContainerId().equals(amContainerId)) {
|
|
||||||
amContainerExists = true;
|
|
||||||
}
|
|
||||||
if(status.getContainerId().equals(containerId3)) {
|
if(status.getContainerId().equals(containerId3)) {
|
||||||
// containerId3 is the container ran by previous attempt but finished by the
|
// containerId3 is the container ran by previous attempt but finished by the
|
||||||
// new attempt.
|
// new attempt.
|
||||||
|
@ -227,11 +220,8 @@ public class TestAMRestart {
|
||||||
container6Exists = true;
|
container6Exists = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Assert.assertTrue(amContainerExists);
|
Assert.assertTrue(container3Exists && container4Exists && container5Exists
|
||||||
Assert.assertTrue(container3Exists);
|
&& container6Exists);
|
||||||
Assert.assertTrue(container4Exists);
|
|
||||||
Assert.assertTrue(container5Exists);
|
|
||||||
Assert.assertTrue(container6Exists);
|
|
||||||
|
|
||||||
// New SchedulerApplicationAttempt also has the containers info.
|
// New SchedulerApplicationAttempt also has the containers info.
|
||||||
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
||||||
|
@ -250,14 +240,14 @@ public class TestAMRestart {
|
||||||
// all 4 normal containers finished.
|
// all 4 normal containers finished.
|
||||||
System.out.println("New attempt's just finished containers: "
|
System.out.println("New attempt's just finished containers: "
|
||||||
+ newAttempt.getJustFinishedContainers());
|
+ newAttempt.getJustFinishedContainers());
|
||||||
waitForContainersToFinish(6, newAttempt);
|
waitForContainersToFinish(5, newAttempt);
|
||||||
rm1.stop();
|
rm1.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
|
private void waitForContainersToFinish(int expectedNum, RMAppAttempt attempt)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
while (attempt.getJustFinishedContainers().size() < expectedNum
|
while (attempt.getJustFinishedContainers().size() != expectedNum
|
||||||
&& count < 500) {
|
&& count < 500) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
count++;
|
count++;
|
||||||
|
|
|
@ -965,7 +965,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
sendAttemptUpdateSavedEvent(applicationAttempt);
|
sendAttemptUpdateSavedEvent(applicationAttempt);
|
||||||
assertEquals(RMAppAttemptState.FAILED,
|
assertEquals(RMAppAttemptState.FAILED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0, application.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
||||||
|
@ -1003,7 +1003,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
sendAttemptUpdateSavedEvent(applicationAttempt);
|
sendAttemptUpdateSavedEvent(applicationAttempt);
|
||||||
assertEquals(RMAppAttemptState.KILLED,
|
assertEquals(RMAppAttemptState.KILLED,
|
||||||
applicationAttempt.getAppAttemptState());
|
applicationAttempt.getAppAttemptState());
|
||||||
assertEquals(1,applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
|
||||||
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
||||||
assertEquals(0, application.getRanNodes().size());
|
assertEquals(0, application.getRanNodes().size());
|
||||||
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
|
||||||
|
@ -1192,7 +1192,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||||
ContainerState.COMPLETE, "", 0), anyNodeId));
|
ContainerState.COMPLETE, "", 0), anyNodeId));
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
diagnostics, 1, false);
|
diagnostics, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
|
// While attempt is at FINAL_SAVING, Contaienr_Finished event may come before
|
||||||
|
@ -1225,7 +1225,7 @@ public class TestRMAppAttemptTransitions {
|
||||||
// send attempt_saved
|
// send attempt_saved
|
||||||
sendAttemptUpdateSavedEvent(applicationAttempt);
|
sendAttemptUpdateSavedEvent(applicationAttempt);
|
||||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||||
diagnostics, 1, false);
|
diagnostics, 0, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
// While attempt is at FINAL_SAVING, Expire event may come before
|
// While attempt is at FINAL_SAVING, Expire event may come before
|
||||||
|
@ -1381,13 +1381,13 @@ public class TestRMAppAttemptTransitions {
|
||||||
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
|
||||||
|
|
||||||
// failed attempt captured the container finished event.
|
// failed attempt captured the container finished event.
|
||||||
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(0, applicationAttempt.getJustFinishedContainers().size());
|
||||||
ContainerStatus cs2 =
|
ContainerStatus cs2 =
|
||||||
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
|
ContainerStatus.newInstance(ContainerId.newInstance(appAttemptId, 2),
|
||||||
ContainerState.COMPLETE, "", 0);
|
ContainerState.COMPLETE, "", 0);
|
||||||
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
|
||||||
appAttemptId, cs2, anyNodeId));
|
appAttemptId, cs2, anyNodeId));
|
||||||
assertEquals(2, applicationAttempt.getJustFinishedContainers().size());
|
assertEquals(1, applicationAttempt.getJustFinishedContainers().size());
|
||||||
boolean found = false;
|
boolean found = false;
|
||||||
for (ContainerStatus containerStatus:applicationAttempt
|
for (ContainerStatus containerStatus:applicationAttempt
|
||||||
.getJustFinishedContainers()) {
|
.getJustFinishedContainers()) {
|
||||||
|
|
Loading…
Reference in New Issue