YARN-1885. Fixed a bug that RM may not send application-clean-up signal to NMs where the completed applications previously ran in case of RM restart. Contributed by Wangda Tan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603028 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-16 23:56:12 +00:00
parent 50cd2a6bed
commit 95897ca14b
28 changed files with 496 additions and 141 deletions

View File

@ -248,6 +248,10 @@ Release 2.5.0 - UNRELEASED
YARN-2155. FairScheduler: Incorrect threshold check for preemption. YARN-2155. FairScheduler: Incorrect threshold check for preemption.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-1885. Fixed a bug that RM may not send application-clean-up signal
to NMs where the completed applications previously ran in case of RM restart.
(Wangda Tan via jianhe)
Release 2.4.1 - 2014-06-23 Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
// make sure registerNodeManager works when failover happens // make sure registerNodeManager works when failover happens
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource, RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
YarnVersionInfo.getVersion(), null); YarnVersionInfo.getVersion(), null, null);
resourceTracker.registerNodeManager(request); resourceTracker.registerNodeManager(request);
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId)); Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -28,7 +29,8 @@ public abstract class RegisterNodeManagerRequest {
public static RegisterNodeManagerRequest newInstance(NodeId nodeId, public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId, int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses) { List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications) {
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort); request.setHttpPort(httpPort);
@ -36,6 +38,7 @@ public abstract class RegisterNodeManagerRequest {
request.setNodeId(nodeId); request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId); request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses); request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications);
return request; return request;
} }
@ -45,10 +48,30 @@ public abstract class RegisterNodeManagerRequest {
public abstract String getNMVersion(); public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses(); public abstract List<NMContainerStatus> getNMContainerStatuses();
/**
* We introduce this here because currently YARN RM doesn't persist nodes info
* for application running. When RM restart happened, we cannot determinate if
* a node should do application cleanup (like log-aggregation, status update,
* etc.) or not. <p/>
* When we have this running application list in node manager register
* request, we can recover nodes info for running applications. And then we
* can take actions accordingly
*
* @return running application list in this node
*/
public abstract List<ApplicationId> getRunningApplications();
public abstract void setNodeId(NodeId nodeId); public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port); public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource); public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version); public abstract void setNMVersion(String version);
public abstract void setContainerStatuses( public abstract void setContainerStatuses(
List<NMContainerStatus> containerStatuses); List<NMContainerStatus> containerStatuses);
/**
* Setter for {@link RegisterNodeManagerRequest#getRunningApplications()}
* @param runningApplications running application in this node
*/
public abstract void setRunningApplications(
List<ApplicationId> runningApplications);
} }

View File

@ -20,12 +20,23 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private Resource resource = null; private Resource resource = null;
private NodeId nodeId = null; private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null; private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
public RegisterNodeManagerRequestPBImpl() { public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder(); builder = RegisterNodeManagerRequestProto.newBuilder();
@ -65,6 +77,9 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
if (this.containerStatuses != null) { if (this.containerStatuses != null) {
addNMContainerStatusesToProto(); addNMContainerStatusesToProto();
} }
if (this.runningApplications != null) {
addRunningApplicationsToProto();
}
if (this.resource != null) { if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource)); builder.setResource(convertToProtoFormat(this.resource));
} }
@ -159,6 +174,66 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
builder.setHttpPort(httpPort); builder.setHttpPort(httpPort);
} }
@Override
public List<ApplicationId> getRunningApplications() {
initRunningApplications();
return runningApplications;
}
private void initRunningApplications() {
if (this.runningApplications != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdProto> list = p.getRunningApplicationsList();
this.runningApplications = new ArrayList<ApplicationId>();
for (ApplicationIdProto c : list) {
this.runningApplications.add(convertFromProtoFormat(c));
}
}
@Override
public void setRunningApplications(List<ApplicationId> apps) {
if (apps == null) {
return;
}
initRunningApplications();
this.runningApplications.addAll(apps);
}
private void addRunningApplicationsToProto() {
maybeInitBuilder();
builder.clearRunningApplications();
if (runningApplications == null) {
return;
}
Iterable<ApplicationIdProto> it = new Iterable<ApplicationIdProto>() {
@Override
public Iterator<ApplicationIdProto> iterator() {
return new Iterator<ApplicationIdProto>() {
Iterator<ApplicationId> iter = runningApplications.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllRunningApplications(it);
}
@Override @Override
public List<NMContainerStatus> getNMContainerStatuses() { public List<NMContainerStatus> getNMContainerStatuses() {
initContainerRecoveryReports(); initContainerRecoveryReports();
@ -217,6 +292,14 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
builder.setNmVersion(version); builder.setNmVersion(version);
} }
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) { private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
return new NodeIdPBImpl(p); return new NodeIdPBImpl(p);
} }

View File

@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto {
optional ResourceProto resource = 4; optional ResourceProto resource = 4;
optional string nm_version = 5; optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6; repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7;
} }
message RegisterNodeManagerResponseProto { message RegisterNodeManagerResponseProto {

View File

@ -83,7 +83,8 @@ public class TestProtocolRecords {
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance( RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("1.1.1.1", 1000), 8080, NodeId.newInstance("1.1.1.1", 1000), 8080,
Resource.newInstance(1024, 1), "NM-version-id", reports); Resource.newInstance(1024, 1), "NM-version-id", reports,
Arrays.asList(appId));
RegisterNodeManagerRequest requestProto = RegisterNodeManagerRequest requestProto =
new RegisterNodeManagerRequestPBImpl( new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto()); ((RegisterNodeManagerRequestPBImpl) request).getProto());
@ -95,5 +96,7 @@ public class TestProtocolRecords {
requestProto.getNodeId()); requestProto.getNodeId());
Assert.assertEquals(Resource.newInstance(1024, 1), Assert.assertEquals(Resource.newInstance(1024, 1),
requestProto.getResource()); requestProto.getResource());
Assert.assertEquals(1, requestProto.getRunningApplications().size());
Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
} }
} }

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.Arrays;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.junit.Assert;
import org.junit.Test;
public class TestRegisterNodeManagerRequest {
@Test
public void testRegisterNodeManagerRequest() {
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
"version", Arrays.asList(NMContainerStatus.newInstance(
ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(1234L, 1), 1), 1),
ContainerState.RUNNING, Resource.newInstance(1024, 1), "good",
-1)), Arrays.asList(ApplicationId.newInstance(1234L, 1),
ApplicationId.newInstance(1234L, 2)));
// serialze to proto, and get request from proto
RegisterNodeManagerRequest request1 =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
// check values
Assert.assertEquals(request1.getNMContainerStatuses().size(), request
.getNMContainerStatuses().size());
Assert.assertEquals(request1.getNMContainerStatuses().get(0).getContainerId(),
request.getNMContainerStatuses().get(0).getContainerId());
Assert.assertEquals(request1.getRunningApplications().size(), request
.getRunningApplications().size());
Assert.assertEquals(request1.getRunningApplications().get(0), request
.getRunningApplications().get(0));
Assert.assertEquals(request1.getRunningApplications().get(1), request
.getRunningApplications().get(1));
}
@Test
public void testRegisterNodeManagerRequestWithNullArrays() {
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234),
1234, Resource.newInstance(0, 0), "version", null, null);
// serialze to proto, and get request from proto
RegisterNodeManagerRequest request1 =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
// check values
Assert.assertEquals(0, request1.getNMContainerStatuses().size());
Assert.assertEquals(0, request1.getRunningApplications().size());
}
}

View File

@ -250,7 +250,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
List<NMContainerStatus> containerReports = getNMContainerStatuses(); List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports); nodeManagerVersionId, containerReports, getRunningApplications());
if (containerReports != null) { if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports); LOG.info("Registering with RM using containers :" + containerReports);
} }
@ -375,6 +375,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return containerStatuses; return containerStatuses;
} }
private List<ApplicationId> getRunningApplications() {
List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
runningApplications.addAll(this.context.getApplications().keySet());
return runningApplications;
}
// These NMContainerStatus are sent on NM registration and used by YARN only. // These NMContainerStatus are sent on NM registration and used by YARN only.
private List<NMContainerStatus> getNMContainerStatuses() { private List<NMContainerStatus> getNMContainerStatuses() {
List<NMContainerStatus> containerStatuses = List<NMContainerStatus> containerStatuses =

View File

@ -244,15 +244,6 @@ public class ResourceTrackerService extends AbstractService implements
Resource capability = request.getResource(); Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion(); String nodeManagerVersion = request.getNMVersion();
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
if (!request.getNMContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) {
handleNMContainerStatus(status);
}
}
}
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
@ -311,7 +302,8 @@ public class ResourceTrackerService extends AbstractService implements
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode); RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) { if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses())); new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
request.getRunningApplications()));
} else { } else {
LOG.info("Reconnect from the node at: " + host); LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId); this.nmLivelinessMonitor.unregister(nodeId);
@ -323,6 +315,18 @@ public class ResourceTrackerService extends AbstractService implements
this.nmTokenSecretManager.removeNodeKey(nodeId); this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId); this.nmLivelinessMonitor.register(nodeId);
// Handle received container status, this should be processed after new
// RMNode inserted
if (!rmContext.isWorkPreservingRecoveryEnabled()) {
if (!request.getNMContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getNMContainerStatuses());
for (NMContainerStatus status : request.getNMContainerStatuses()) {
handleNMContainerStatus(status);
}
}
}
String message = String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: " "NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
+ httpPort + ") " + "registered with capability: " + capability + httpPort + ") " + "registered with capability: " + capability

View File

@ -19,16 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -209,6 +209,14 @@ public interface RMApp extends EventHandler<RMAppEvent> {
*/ */
boolean isAppFinalStateStored(); boolean isAppFinalStateStored();
/**
* Nodes on which the containers for this {@link RMApp} ran.
* @return the set of nodes that ran any containers from this {@link RMApp}
* Add more node on which containers for this {@link RMApp} ran
*/
Set<NodeId> getRanNodes();
/** /**
* Create the external user-facing state of ApplicationMaster from the * Create the external user-facing state of ApplicationMaster from the
* current state of the {@link RMApp}. * current state of the {@link RMApp}.

View File

@ -39,6 +39,9 @@ public enum RMAppEventType {
ATTEMPT_KILLED, ATTEMPT_KILLED,
NODE_UPDATE, NODE_UPDATE,
// Source: Container and ResourceTracker
APP_RUNNING_ON_NODE,
// Source: RMStateStore // Source: RMStateStore
APP_NEW_SAVED, APP_NEW_SAVED,
APP_UPDATE_SAVED, APP_UPDATE_SAVED,

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException; import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition; import org.apache.hadoop.yarn.state.MultipleArcTransition;
@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp, Recoverable {
private EventHandler handler; private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION = private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition(); new AppFinishedTransition();
private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
// These states stored are only valid when app is at killing or final_saving. // These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling; private RMAppState stateBeforeKilling;
@ -180,7 +181,6 @@ public class RMAppImpl implements RMApp, Recoverable {
new FinalSavingTransition( new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED)) new AppKilledTransition(), RMAppState.KILLED))
// Transitions from ACCEPTED state // Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED, .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@ -200,6 +200,9 @@ public class RMAppImpl implements RMApp, Recoverable {
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED)) new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition()) RMAppEventType.KILL, new KillAttemptTransition())
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on // ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go // recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at // through the scheduler and triggers one more APP_ACCEPTED event at
@ -220,6 +223,9 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED, .addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
// UnManagedAM directly jumps to finished // UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.RUNNING, .addTransition(RMAppState.RUNNING,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING), EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED, RMAppEventType.ATTEMPT_FAILED,
@ -235,6 +241,9 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FINISHED,
new AttemptFinishedAtFinalSavingTransition()) new AttemptFinishedAtFinalSavingTransition())
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL, EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@ -243,6 +252,9 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from FINISHING state // Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED, .addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION) RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING, .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE, EnumSet.of(RMAppEventType.NODE_UPDATE,
@ -251,6 +263,9 @@ public class RMAppImpl implements RMApp, Recoverable {
RMAppEventType.KILL)) RMAppEventType.KILL))
// Transitions from KILLING state // Transitions from KILLING state
.addTransition(RMAppState.KILLING, RMAppState.KILLING,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING, .addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED, RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition( new FinalSavingTransition(
@ -267,6 +282,9 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from FINISHED state // Transitions from FINISHED state
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED, .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
EnumSet.of( EnumSet.of(
RMAppEventType.NODE_UPDATE, RMAppEventType.NODE_UPDATE,
@ -276,11 +294,17 @@ public class RMAppImpl implements RMApp, Recoverable {
// Transitions from FAILED state // Transitions from FAILED state
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED, .addTransition(RMAppState.FAILED, RMAppState.FAILED,
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE)) EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state // Transitions from KILLED state
// ignorable transitions // ignorable transitions
.addTransition(RMAppState.KILLED, RMAppState.KILLED,
RMAppEventType.APP_RUNNING_ON_NODE,
new AppRunningOnNodeTransition())
.addTransition( .addTransition(
RMAppState.KILLED, RMAppState.KILLED,
RMAppState.KILLED, RMAppState.KILLED,
@ -696,6 +720,23 @@ public class RMAppImpl implements RMApp, Recoverable {
}; };
} }
private static final class AppRunningOnNodeTransition extends RMAppTransition {
public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
// if final state already stored, notify RMNode
if (isAppInFinalState(app)) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
.getApplicationId()));
return;
}
// otherwise, add it to ranNodes for further process
app.ranNodes.add(nodeAddedEvent.getNodeId());
};
}
/** /**
* Move an app to a new queue. * Move an app to a new queue.
* This transition must set the result on the Future in the RMAppMoveEvent, * This transition must set the result on the Future in the RMAppMoveEvent,
@ -1037,17 +1078,8 @@ public class RMAppImpl implements RMApp, Recoverable {
this.finalState = finalState; this.finalState = finalState;
} }
private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
Set<NodeId> nodes = new HashSet<NodeId>();
for (RMAppAttempt attempt : app.attempts.values()) {
nodes.addAll(attempt.getRanNodes());
}
return nodes;
}
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
Set<NodeId> nodes = getNodesOnWhichAttemptRan(app); for (NodeId nodeId : app.getRanNodes()) {
for (NodeId nodeId : nodes) {
app.handler.handle( app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId)); new RMNodeCleanAppEvent(nodeId, app.applicationId));
} }
@ -1148,4 +1180,9 @@ public class RMAppImpl implements RMApp, Recoverable {
private RMAppState getRecoveredFinalState() { private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState; return this.recoveredFinalState;
} }
@Override
public Set<NodeId> getRanNodes() {
return ranNodes;
}
} }

View File

@ -16,25 +16,20 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptContainerAcquiredEvent extends RMAppAttemptEvent { public class RMAppRunningOnNodeEvent extends RMAppEvent {
private final NodeId node;
private final Container container; public RMAppRunningOnNodeEvent(ApplicationId appId, NodeId node) {
super(appId, RMAppEventType.APP_RUNNING_ON_NODE);
public RMAppAttemptContainerAcquiredEvent(ApplicationAttemptId appAttemptId, this.node = node;
Container container) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_ACQUIRED);
this.container = container;
} }
public Container getContainer() { public NodeId getNodeId() {
return this.container; return node;
} }
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List; import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
@ -114,12 +112,6 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent> {
*/ */
FinalApplicationStatus getFinalApplicationStatus(); FinalApplicationStatus getFinalApplicationStatus();
/**
* Nodes on which the containers for this {@link RMAppAttempt} ran.
* @return the set of nodes that ran any containers from this {@link RMAppAttempt}
*/
Set<NodeId> getRanNodes();
/** /**
* Return a list of the last set of finished containers, resetting the * Return a list of the last set of finished containers, resetting the
* finished containers to empty. * finished containers to empty.

View File

@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
UNREGISTERED, UNREGISTERED,
// Source: Containers // Source: Containers
CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED, CONTAINER_ALLOCATED,
CONTAINER_FINISHED, CONTAINER_FINISHED,

View File

@ -26,16 +26,13 @@ import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey; import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting;
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@ -134,9 +131,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private Token<AMRMTokenIdentifier> amrmToken = null; private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null; private SecretKey clientTokenMasterKey = null;
//nodes on while this attempt's containers ran
private Set<NodeId> ranNodes =
new HashSet<NodeId>();
private List<ContainerStatus> justFinishedContainers = private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
private Container masterContainer; private Container masterContainer;
@ -219,10 +213,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.addTransition(RMAppAttemptState.ALLOCATED_SAVING, .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED, RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition()) RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
new ContainerAcquiredTransition())
// App could be killed by the client. So need to handle this. // App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING, .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING, RMAppAttemptState.FINAL_SAVING,
@ -249,10 +240,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED)) RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State // Transitions from ALLOCATED State
.addTransition(RMAppAttemptState.ALLOCATED,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED, .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition()) RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING, .addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
@ -296,10 +283,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition()) RMAppAttemptEventType.STATUS_UPDATE, new StatusUpdateTransition())
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING, .addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ALLOCATED) RMAppAttemptEventType.CONTAINER_ALLOCATED)
.addTransition(
RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
new ContainerAcquiredTransition())
.addTransition( .addTransition(
RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING), EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
@ -337,7 +320,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// should be fixed to reject container allocate request at Final // should be fixed to reject container allocate request at Final
// Saving in scheduler // Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED, RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL)) RMAppAttemptEventType.KILL))
@ -619,11 +601,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
@Override
public Set<NodeId> getRanNodes() {
return ranNodes;
}
@Override @Override
public Container getMasterContainer() { public Container getMasterContainer() {
this.readLock.lock(); this.readLock.lock();
@ -705,7 +682,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) { public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers(); this.justFinishedContainers = attempt.getJustFinishedContainers();
this.ranNodes = attempt.getRanNodes();
} }
private void recoverAppAttemptCredentials(Credentials appAttemptTokens) private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@ -1402,17 +1378,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
finalStatus = unregisterEvent.getFinalApplicationStatus(); finalStatus = unregisterEvent.getFinalApplicationStatus();
} }
private static final class ContainerAcquiredTransition extends
BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
RMAppAttemptContainerAcquiredEvent acquiredEvent
= (RMAppAttemptContainerAcquiredEvent) event;
appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
}
}
private static final class ContainerFinishedTransition private static final class ContainerFinishedTransition
implements implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> { MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@ -365,9 +365,9 @@ public class RMContainerImpl implements RMContainer {
RMContainerEventType.FINISHED)); RMContainerEventType.FINISHED));
return RMContainerState.COMPLETED; return RMContainerState.COMPLETED;
} else if (report.getContainerState().equals(ContainerState.RUNNING)) { } else if (report.getContainerState().equals(ContainerState.RUNNING)) {
// Tell the appAttempt // Tell the app
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
container.getApplicationAttemptId(), container.getContainer())); .getApplicationAttemptId().getApplicationId(), container.nodeId));
return RMContainerState.RUNNING; return RMContainerState.RUNNING;
} else { } else {
// This can never happen. // This can never happen.
@ -408,9 +408,9 @@ public class RMContainerImpl implements RMContainer {
// Register with containerAllocationExpirer. // Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainerId()); container.containerAllocationExpirer.register(container.getContainerId());
// Tell the appAttempt // Tell the app
container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent( container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
container.getApplicationAttemptId(), container.getContainer())); .getApplicationAttemptId().getApplicationId(), container.nodeId));
} }
} }

View File

@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -473,7 +475,13 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} else { } else {
// Increment activeNodes explicitly because this is a new node. // Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes(); ClusterMetrics.getMetrics().incrNumActiveNodes();
containers = startEvent.getContainerRecoveryReports(); containers = startEvent.getNMContainerStatuses();
}
if (null != startEvent.getRunningApplications()) {
for (ApplicationId appId : startEvent.getRunningApplications()) {
handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
}
} }
rmNode.context.getDispatcher().getEventHandler() rmNode.context.getDispatcher().getEventHandler()
@ -482,6 +490,24 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new NodesListManagerEvent( new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode)); NodesListManagerEventType.NODE_USABLE, rmNode));
} }
void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
ApplicationId appId, NodeId nodeId) {
RMApp app = context.getRMApps().get(appId);
// if we failed getting app by appId, maybe something wrong happened, just
// add the app to the finishedApplications list so that the app can be
// cleaned up on the NM
if (null == app) {
LOG.warn("Cannot get RMApp by appId=" + appId
+ ", just added it to finishedApplications list for cleanup");
rmNode.finishedApplications.add(appId);
return;
}
context.getDispatcher().getEventHandler()
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
} }
public static class ReconnectNodeTransition implements public static class ReconnectNodeTransition implements
@ -517,7 +543,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode); rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null)); new RMNodeStartedEvent(newNode.getNodeID(), null, null));
} }
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent( new NodesListManagerEvent(

View File

@ -20,19 +20,28 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeStartedEvent extends RMNodeEvent { public class RMNodeStartedEvent extends RMNodeEvent {
private List<NMContainerStatus> containerReports; private List<NMContainerStatus> containerStatuses;
private List<ApplicationId> runningApplications;
public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) { public RMNodeStartedEvent(NodeId nodeId,
List<NMContainerStatus> containerReports,
List<ApplicationId> runningApplications) {
super(nodeId, RMNodeEventType.STARTED); super(nodeId, RMNodeEventType.STARTED);
this.containerReports = containerReports; this.containerStatuses = containerReports;
this.runningApplications = runningApplications;
} }
public List<NMContainerStatus> getContainerRecoveryReports() { public List<NMContainerStatus> getNMContainerStatuses() {
return this.containerReports; return this.containerStatuses;
}
public List<ApplicationId> getRunningApplications() {
return runningApplications;
} }
} }

View File

@ -100,11 +100,17 @@ public class MockNM {
} }
public RegisterNodeManagerResponse registerNode() throws Exception { public RegisterNodeManagerResponse registerNode() throws Exception {
return registerNode(null); return registerNode(null, null);
} }
public RegisterNodeManagerResponse registerNode( public RegisterNodeManagerResponse registerNode(
List<NMContainerStatus> containerReports) throws Exception{ List<ApplicationId> runningApplications) throws Exception {
return registerNode(null, runningApplications);
}
public RegisterNodeManagerResponse registerNode(
List<NMContainerStatus> containerReports,
List<ApplicationId> runningApplications) throws Exception {
RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class); RegisterNodeManagerRequest.class);
req.setNodeId(nodeId); req.setNodeId(nodeId);
@ -113,6 +119,7 @@ public class MockNM {
req.setResource(resource); req.setResource(resource);
req.setContainerStatuses(containerReports); req.setContainerStatuses(containerReports);
req.setNMVersion(version); req.setNMVersion(version);
req.setRunningApplications(runningApplications);
RegisterNodeManagerResponse registrationResponse = RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req); resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey = this.currentContainerTokenMasterKey =

View File

@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
@ -351,10 +352,19 @@ public class MockRM extends ResourceManager {
return nm; return nm;
} }
public MockNM registerNode(String nodeIdStr, int memory, int vCores,
List<ApplicationId> runningApplications) throws Exception {
MockNM nm =
new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
YarnVersionInfo.getVersion());
nm.registerNode(runningApplications);
return nm;
}
public void sendNodeStarted(MockNM nm) throws Exception { public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get( RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId()); nm.getNodeId());
node.handle(new RMNodeStartedEvent(nm.getNodeId(), null)); node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
} }
public void sendNodeLost(MockNM nm) throws Exception { public void sendNodeLost(MockNM nm) throws Exception {

View File

@ -18,26 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -45,6 +49,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestApplicationCleanup { public class TestApplicationCleanup {
@ -52,6 +58,20 @@ public class TestApplicationCleanup {
private static final Log LOG = LogFactory private static final Log LOG = LogFactory
.getLog(TestApplicationCleanup.class); .getLog(TestApplicationCleanup.class);
private YarnConfiguration conf;
@Before
public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
}
@SuppressWarnings("resource")
@Test @Test
public void testAppCleanup() throws Exception { public void testAppCleanup() throws Exception {
Logger rootLogger = LogManager.getRootLogger(); Logger rootLogger = LogManager.getRootLogger();
@ -130,6 +150,7 @@ public class TestApplicationCleanup {
rm.stop(); rm.stop();
} }
@SuppressWarnings("resource")
@Test @Test
public void testContainerCleanup() throws Exception { public void testContainerCleanup() throws Exception {
@ -253,6 +274,69 @@ public class TestApplicationCleanup {
rm.stop(); rm.stop();
} }
private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
throws Exception {
while (true) {
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
if (response.getApplicationsToCleanup() != null
&& response.getApplicationsToCleanup().size() == 1
&& appId.equals(response.getApplicationsToCleanup().get(0))) {
return;
}
LOG.info("Haven't got application=" + appId.toString()
+ " in cleanup list from node heartbeat response, "
+ "sleep for a while before next heartbeat");
Thread.sleep(1000);
}
}
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
return am;
}
@SuppressWarnings("resource")
@Test (timeout = 60000)
public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the AM
RMApp app0 = rm1.submitApp(200);
MockAM am0 = launchAM(app0, rm1, nm1);
nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
// nm1 register to rm2, and do a heartbeat
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
nm1.registerNode(Arrays.asList(app0.getApplicationId()));
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
// wait for application cleanup message received
waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
rm1.stop();
rm2.stop();
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup(); TestApplicationCleanup t = new TestApplicationCleanup();
t.testAppCleanup(); t.testAppCleanup();

View File

@ -161,7 +161,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testExpiredContainer() { public void testExpiredContainer() {
// Start the node // Start the node
node.handle(new RMNodeStartedEvent(null, null)); node.handle(new RMNodeStartedEvent(null, null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class)); verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container // Expire a container
@ -189,11 +189,11 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{ public void testContainerUpdate() throws InterruptedException{
//Start the node //Start the node
node.handle(new RMNodeStartedEvent(null, null)); node.handle(new RMNodeStartedEvent(null, null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1); NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null); RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
node2.handle(new RMNodeStartedEvent(null, null)); node2.handle(new RMNodeStartedEvent(null, null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId( ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId( BuilderUtils.newApplicationAttemptId(
@ -249,7 +249,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000) @Test (timeout = 5000)
public void testStatusChange(){ public void testStatusChange(){
//Start the node //Start the node
node.handle(new RMNodeStartedEvent(null, null)); node.handle(new RMNodeStartedEvent(null, null, null));
//Add info to the queue first //Add info to the queue first
node.setNextHeartBeat(false); node.setNextHeartBeat(false);
@ -465,7 +465,7 @@ public class TestRMNodeTransitions {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0, RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability, null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion); RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState()); Assert.assertEquals(NodeState.RUNNING, node.getState());
return node; return node;
} }
@ -496,7 +496,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs(); int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs(); int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs(); int initialRebooted = cm.getNumRebootedNMs();
node.handle(new RMNodeStartedEvent(node.getNodeID(), null)); node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs()); Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs()); Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes", Assert.assertEquals("Unhealthy Nodes",

View File

@ -102,7 +102,6 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestRMRestart { public class TestRMRestart {
private final static File TEMP_DIR = new File(System.getProperty( private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision"); "test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt"); private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@ -309,7 +308,7 @@ public class TestRMRestart {
TestRMRestart TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE); .getAppAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status)); nm1.registerNode(Arrays.asList(status), null);
nm2.registerNode(); nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@ -392,7 +391,7 @@ public class TestRMRestart {
// completed apps are not removed immediately after app finish // completed apps are not removed immediately after app finish
// And finished app is also loaded back. // And finished app is also loaded back.
Assert.assertEquals(4, rmAppState.size()); Assert.assertEquals(4, rmAppState.size());
} }
@Test (timeout = 60000) @Test (timeout = 60000)
public void testRMRestartAppRunningAMFailed() throws Exception { public void testRMRestartAppRunningAMFailed() throws Exception {
@ -514,7 +513,7 @@ public class TestRMRestart {
NMContainerStatus status = NMContainerStatus status =
TestRMRestart.createNMContainerStatus( TestRMRestart.createNMContainerStatus(
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status)); nm1.registerNode(Arrays.asList(status), null);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1); launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size()); Assert.assertEquals(3, rmApp.getAppAttempts().size());
@ -1680,7 +1679,8 @@ public class TestRMRestart {
TestRMRestart TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt() .createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE); .getAppAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status)); nm1.registerNode(Arrays.asList(status), null);
while (loadedApp1.getAppAttempts().size() != 2) { while (loadedApp1.getAppAttempts().size() != 2) {
Thread.sleep(200); Thread.sleep(200);
} }
@ -1807,7 +1807,7 @@ public class TestRMRestart {
NMContainerStatus status = NMContainerStatus status =
TestRMRestart.createNMContainerStatus( TestRMRestart.createNMContainerStatus(
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(status)); nm1.registerNode(Arrays.asList(status), null);
} }
}; };
} }

View File

@ -159,7 +159,7 @@ public class TestWorkPreservingRMRestart {
ContainerState.COMPLETE); ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(amContainer, runningContainer, nm1.registerNode(Arrays.asList(amContainer, runningContainer,
completedContainer)); completedContainer), null);
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId()); waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
@ -383,11 +383,11 @@ public class TestWorkPreservingRMRestart {
List<NMContainerStatus> am1_2Containers = List<NMContainerStatus> am1_2Containers =
createNMContainerStatusForApp(am1_2); createNMContainerStatusForApp(am1_2);
am1_1Containers.addAll(am1_2Containers); am1_1Containers.addAll(am1_2Containers);
nm1.registerNode(am1_1Containers); nm1.registerNode(am1_1Containers, null);
List<NMContainerStatus> am2Containers = List<NMContainerStatus> am2Containers =
createNMContainerStatusForApp(am2); createNMContainerStatusForApp(am2);
nm2.registerNode(am2Containers); nm2.registerNode(am2Containers, null);
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId()); waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
@ -482,7 +482,7 @@ public class TestWorkPreservingRMRestart {
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE); ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(amContainer, runningContainer, nm1.registerNode(Arrays.asList(amContainer, runningContainer,
completedContainer)); completedContainer), null);
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
// Wait for RM to settle down on recovering containers; // Wait for RM to settle down on recovering containers;
Thread.sleep(3000); Thread.sleep(3000);
@ -519,7 +519,7 @@ public class TestWorkPreservingRMRestart {
NMContainerStatus completedContainer = NMContainerStatus completedContainer =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3, TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE); ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(runningContainer, completedContainer)); nm1.registerNode(Arrays.asList(runningContainer, completedContainer), null);
RMApp recoveredApp1 = RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId()); rm2.getRMContext().getRMApps().get(app1.getApplicationId());
assertEquals(RMAppState.FINISHED, recoveredApp1.getState()); assertEquals(RMAppState.FINISHED, recoveredApp1.getState());

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -159,6 +160,11 @@ public abstract class MockAsm extends MockApps {
public YarnApplicationState createApplicationState() { public YarnApplicationState createApplicationState() {
throw new UnsupportedOperationException("Not supported yet."); throw new UnsupportedOperationException("Not supported yet.");
} }
@Override
public Set<NodeId> getRanNodes() {
throw new UnsupportedOperationException("Not supported yet.");
}
} }
public static RMApp newApplication(int i) { public static RMApp newApplication(int i) {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -232,4 +233,9 @@ public class MockRMApp implements RMApp {
public YarnApplicationState createApplicationState() { public YarnApplicationState createApplicationState() {
return null; return null;
} }
@Override
public Set<NodeId> getRanNodes() {
return null;
}
} }

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
@ -75,8 +76,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@ -315,7 +316,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer()); assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
assertNotNull(applicationAttempt.getTrackingUrl()); assertNotNull(applicationAttempt.getTrackingUrl());
assertFalse("N/A".equals(applicationAttempt.getTrackingUrl())); assertFalse("N/A".equals(applicationAttempt.getTrackingUrl()));
@ -331,7 +332,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer()); assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
verify(clientToAMTokenManager).createMasterKey( verify(clientToAMTokenManager).createMasterKey(
@ -359,7 +360,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer()); assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
// Check events // Check events
@ -385,7 +386,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
verifyAttemptFinalStateSaved(); verifyAttemptFinalStateSaved();
@ -425,7 +426,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertNull(applicationAttempt.getMasterContainer()); assertNull(applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
assertNull(applicationAttempt.getFinalApplicationStatus()); assertNull(applicationAttempt.getFinalApplicationStatus());
} }
@ -461,7 +462,7 @@ public class TestRMAppAttemptTransitions {
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(container, applicationAttempt.getMasterContainer()); assertEquals(container, applicationAttempt.getMasterContainer());
assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001); assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
// Check events // Check events
verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class)); verify(application, times(1)).handle(any(RMAppFailedAttemptEvent.class));
@ -666,8 +667,10 @@ public class TestRMAppAttemptTransitions {
runApplicationAttempt(null, "host", 8042, url, true); runApplicationAttempt(null, "host", 8042, url, true);
// complete a container // complete a container
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent( Container container = mock(Container.class);
applicationAttempt.getAppAttemptId(), mock(Container.class))); when(container.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
application.handle(new RMAppRunningOnNodeEvent(application.getApplicationId(),
container.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent( applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class))); applicationAttempt.getAppAttemptId(), mock(ContainerStatus.class)));
// complete AM // complete AM
@ -845,7 +848,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
@ -882,7 +885,7 @@ public class TestRMAppAttemptTransitions {
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
assertEquals(0,applicationAttempt.getJustFinishedContainers().size()); assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
assertEquals(amContainer, applicationAttempt.getMasterContainer()); assertEquals(amContainer, applicationAttempt.getMasterContainer());
assertEquals(0, applicationAttempt.getRanNodes().size()); assertEquals(0, application.getRanNodes().size());
String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app", String rmAppPageUrl = pjoin(RM_WEBAPP_ADDR, "cluster", "app",
applicationAttempt.getAppAttemptId().getApplicationId()); applicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());