YARN-1210. Changed RM to start new app-attempts on RM restart only after ensuring that previous AM exited or after expiry time. Contributed by Omkar Vinit Joshi.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-19 05:17:20 +00:00
parent c3c626d32b
commit cfa783141f
19 changed files with 569 additions and 148 deletions

View File

@ -103,6 +103,10 @@ Release 2.3.0 - UNRELEASED
YARN-709. Added tests to verify validity of delegation tokens and logging of
appsummary after RM restart. (Jian He via vinodkv)
YARN-1210. Changed RM to start new app-attempts on RM restart only after
ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
public interface NodeHeartbeatRequest {
public abstract class NodeHeartbeatRequest {
NodeStatus getNodeStatus();
void setNodeStatus(NodeStatus status);
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
nodeHeartbeatRequest
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
return nodeHeartbeatRequest;
}
MasterKey getLastKnownContainerTokenMasterKey();
void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
public abstract NodeStatus getNodeStatus();
public abstract void setNodeStatus(NodeStatus status);
MasterKey getLastKnownNMTokenMasterKey();
void setLastKnownNMTokenMasterKey(MasterKey secretKey);
public abstract MasterKey getLastKnownContainerTokenMasterKey();
public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
}

View File

@ -18,17 +18,37 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
public interface RegisterNodeManagerRequest {
NodeId getNodeId();
int getHttpPort();
Resource getResource();
String getNMVersion();
public abstract class RegisterNodeManagerRequest {
void setNodeId(NodeId nodeId);
void setHttpPort(int port);
void setResource(Resource resource);
void setNMVersion(String version);
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<ContainerStatus> containerStatuses) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
request.setResource(resource);
request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
return request;
}
public abstract NodeId getNodeId();
public abstract int getHttpPort();
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<ContainerStatus> getContainerStatuses();
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
public class NodeHeartbeatRequestPBImpl extends
ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
NodeHeartbeatRequestProto.Builder builder = null;
boolean viaProto = false;
@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl extends
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
private void mergeLocalToBuilder() {
if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));

View File

@ -19,11 +19,21 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
boolean viaProto = false;
private Resource resource = null;
private NodeId nodeId = null;
private List<ContainerStatus> containerStatuses = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
}
private void mergeLocalToBuilder() {
if (this.containerStatuses != null) {
addContainerStatusesToProto();
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
@ -139,6 +153,81 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
builder.setHttpPort(httpPort);
}
@Override
public List<ContainerStatus> getContainerStatuses() {
initContainerStatuses();
return containerStatuses;
}
private void initContainerStatuses() {
if (this.containerStatuses != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getContainerStatusesList();
this.containerStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
this.containerStatuses.add(convertFromProtoFormat(c));
}
}
@Override
public void setContainerStatuses(List<ContainerStatus> containers) {
if (containers == null) {
return;
}
initContainerStatuses();
this.containerStatuses.addAll(containers);
}
private void addContainerStatusesToProto() {
maybeInitBuilder();
builder.clearContainerStatuses();
if (containerStatuses == null) {
return;
}
Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
@Override
public Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = containerStatuses.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainerStatuses(it);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String getNMVersion() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
return ((ResourcePBImpl)t).getProto();
}
private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
return new ContainerStatusPBImpl(c);
}
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
}

View File

@ -22,9 +22,23 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
public interface NodeStatus {
public abstract class NodeStatus {
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List<ContainerStatus> containerStatuses,
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
nodeStatus.setContainersStatuses(containerStatuses);
nodeStatus.setKeepAliveApplications(keepAliveApplications);
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
public abstract NodeId getNodeId();
public abstract int getResponseId();
@ -36,8 +50,8 @@ public interface NodeStatus {
public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract NodeHealthStatus getNodeHealthStatus();
public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
NodeStatus {
public class NodeStatusPBImpl extends NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null;
boolean viaProto = false;
@ -166,6 +164,21 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
builder.addAllKeepAliveApplications(iterable);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto {
optional int32 http_port = 3;
optional ResourceProto resource = 4;
optional string nm_version = 5;
repeated ContainerStatusProto containerStatuses = 6;
}
message RegisterNodeManagerResponseProto {

View File

@ -26,7 +26,7 @@ public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat();
NodeStatus getNodeStatusAndUpdateContainersInContext();
NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
long getRMIdentifier();

View File

@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private String nodeManagerVersionId;
private String minimumResourceManagerVersion;
private volatile boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memoryMb);
this.totalResource.setVirtualCores(virtualCores);
this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
@VisibleForTesting
protected void registerWithRM() throws YarnException, IOException {
protected void registerWithRM()
throws YarnException, IOException {
List<ContainerStatus> containerStatuses =
this.updateAndGetContainerStatuses();
RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
request.setHttpPort(this.httpPort);
request.setResource(this.totalResource);
request.setNodeId(this.nodeId);
request.setNMVersion(this.nodeManagerVersionId);
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerStatuses);
if (containerStatuses != null) {
LOG.info("Registering with RM using finished containers :"
+ containerStatuses);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
@Override
public NodeStatus getNodeStatusAndUpdateContainersInContext() {
public NodeStatus getNodeStatusAndUpdateContainersInContext(
int responseId) {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
LOG.debug(this.nodeId + " sending out status for "
+ containersStatuses.size() + " containers");
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
return nodeStatus;
}
/*
* It will return current container statuses. If any container has
* COMPLETED then it will be removed from context.
*/
private List<ContainerStatus> updateAndGetContainerStatuses() {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
containerStatuses.add(containerStatus);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out status for container: " + containerStatus);
}
@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Removed completed container " + containerId);
}
}
nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.nodeId + " sending out status for "
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
return containerStatuses;
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
NodeStatus nodeStatus =
getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus);
request
.setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey());
request
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getCurrentKey());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();

View File

@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
@ -371,17 +373,31 @@ public class ContainerManagerImpl extends CompositeService implements
this.handle(new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
while (!containers.isEmpty()) {
/*
* We will wait till all the containers change their state to COMPLETE. We
* will not remove the container statuses from nm context because these
* are used while re-registering node manager with resource manager.
*/
boolean allContainersCompleted = false;
while (!containers.isEmpty() && !allContainersCompleted) {
allContainersCompleted = true;
for (Entry<ContainerId, Container> container : containers.entrySet()) {
if (((ContainerImpl) container.getValue()).getCurrentState()
!= ContainerState.COMPLETE) {
allContainersCompleted = false;
try {
Thread.sleep(1000);
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync", ex);
LOG.warn("Interrupted while sleeping on container kill on resync",
ex);
}
break;
}
}
}
// All containers killed
if (containers.isEmpty()) {
if (allContainersCompleted) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +

View File

@ -302,7 +302,7 @@ public class ContainerImpl implements Container {
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine;
private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:

View File

@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@ -183,6 +190,33 @@ public class ResourceTrackerService extends AbstractService implements
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
if (!request.getContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getContainerStatuses());
for (ContainerStatus containerStatus : request.getContainerStatuses()) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
rmContext.getRMApps().get(appAttemptId.getApplicationId());
if (rmApp != null) {
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
if (rmAppAttempt.getMasterContainer().getId()
.equals(containerStatus.getContainerId())
&& containerStatus.getState() == ContainerState.COMPLETE) {
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId,
containerStatus);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
} else {
LOG.error("Received finished container :"
+ containerStatus.getContainerId()
+ " for non existing application :"
+ appAttemptId.getApplicationId());
}
}
}
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);

View File

@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
RMAppState.FINAL_SAVING),
RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
new FinalSavingTransition(
@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp, Recoverable {
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
// recover attempt
((RMAppAttemptImpl) currentAttempt).recover(state);
((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
}
@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp, Recoverable {
};
}
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (app.attempts.isEmpty()) {
// Saved application was not running any attempts.
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
} else {
/*
* If last attempt recovered final state is null .. it means attempt
* was started but AM container may or may not have started / finished.
* Therefore we should wait for it to finish.
*/
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
app.dispatcher.getEventHandler().handle(
new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
if (app.recoveredFinalState != null) {
FINAL_TRANSITION.transition(app, event);
return app.recoveredFinalState;
} else {
return RMAppState.RUNNING;
}
}
// Directly call AttemptFailedTransition, since now we deem that an
// application fails because of RM restart as a normal AM failure.
// Do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
// In work-preserve restart, if attemptCount == maxAttempts, the job still
// needs to be recovered because the last attempt may still be running.
// As part of YARN-1210, we may return ACCECPTED state waiting for AM to
// reregister or fail and remove the following code.
return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
event);
}
}
@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp, Recoverable {
throw new YarnRuntimeException("Unknown state passed!");
}
}
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = rmApp.getState();
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
}

View File

@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
.addTransition( RMAppAttemptState.NEW,
EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
// Transitions from RECOVERED State
.addTransition(
RMAppAttemptState.RECOVERED,
RMAppAttemptState.RECOVERED,
EnumSet.of(RMAppAttemptEventType.START,
RMAppAttemptEventType.APP_ACCEPTED,
RMAppAttemptEventType.APP_REJECTED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
handle(new RMAppAttemptEvent(getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
LOG.info("Recovering attempt : recoverdFinalState :"
+ appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
appAttempt.getAppAttemptId().getApplicationId());
// We will replay the final attempt only if last attempt is in final
// state but application is not in final state.
if (rmApp.getCurrentAppAttempt() == appAttempt
&& !RMAppImpl.isAppInFinalState(rmApp)) {
(new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
appAttempt, event);
}
return appAttempt.recoveredFinalState;
} else {
return RMAppAttemptState.RECOVERED;
/*
* Since the application attempt's final state is not saved that means
* for AM container (previous attempt) state must be one of these.
* 1) AM container may not have been launched (RM failed right before
* this).
* 2) AM container was successfully launched but may or may not have
* registered / unregistered.
* In whichever case we will wait (by moving attempt into LAUNCHED
* state) and mark this attempt failed (assuming non work preserving
* restart) only after
* 1) Node manager during re-registration heart beats back saying
* am container finished.
* 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
* heart beat back).
*/
(new AMLaunchedTransition()).transition(appAttempt, event);
return RMAppAttemptState.LAUNCHED;
}
}
}

View File

@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
FINAL_SAVING
FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -98,12 +100,18 @@ public class MockNM {
}
public RegisterNodeManagerResponse registerNode() throws Exception {
return registerNode(null);
}
public RegisterNodeManagerResponse registerNode(
List<ContainerStatus> containerStatus) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource);
req.setContainerStatuses(containerStatus);
req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -62,10 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -88,6 +93,7 @@ import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mortbay.log.Log;
public class TestRMRestart {
@ -109,6 +115,7 @@ public class TestRMRestart {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
}
@SuppressWarnings("rawtypes")
@Test (timeout=180000)
public void testRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -257,11 +264,14 @@ public class TestRMRestart {
.getApplicationId());
// verify state machine kicked into expected states
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
// verify new attempts created
Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify attempts for apps
// The app for which AM was started will wait for previous am
// container finish event to arrive. However for an application for which
// no am container was running will start new application attempt.
Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
// verify old AM is not accepted
@ -279,8 +289,20 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
.getCurrentAppAttempt().getAppAttemptId(), 1),
ContainerState.COMPLETE, "Killed AM container", 143);
containerStatuses.add(containerStatus);
nm1.registerNode(containerStatuses);
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
@ -403,6 +425,157 @@ public class TestRMRestart {
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
}
@Test
public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
// testing 3 cases
// After RM restarts
// 1) New application attempt is not started until previous AM container
// finish event is reported back to RM as a part of nm registration.
// 2) If previous AM container finish event is never reported back (i.e.
// node manager on which this AM container was running also went down) in
// that case AMLivenessMonitor should time out previous attempt and start
// new attempt.
// 3) If all the stored attempts had finished then new attempt should
// be started immediately.
YarnConfiguration conf = new YarnConfiguration(this.conf);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
nm1.registerNode();
// submitting app
RMApp app1 = rm1.submitApp(200);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am1 = launchAM(app1, rm1, nm1);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
// Fail first AM.
am1.waitForState(RMAppAttemptState.FAILED);
// launch another AM.
MockAM am2 = launchAM(app1, rm1, nm1);
Assert.assertEquals(1, rmAppState.size());
Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
Assert.assertEquals(app1.getAppAttempts()
.get(app1.getCurrentAppAttempt().getAppAttemptId())
.getAppAttemptState(), RMAppAttemptState.RUNNING);
// start new RM.
MockRM rm2 = null;
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
// application should be in running state
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
// new attempt should not be started
Assert.assertEquals(2, rmApp.getAppAttempts().size());
// am1 attempt should be in FAILED state where as am2 attempt should be in
// LAUNCHED state
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
.getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
ContainerState.COMPLETE, "Killed AM container", 143);
containerStatuses.add(containerStatus);
nm1.registerNode(containerStatuses);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.RUNNING);
// Now restart RM ...
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
MockRM rm3 = null;
rm3 = new MockRM(conf, memStore);
rm3.start();
// Wait for RM to process all the events as a part of rm recovery.
nm1.setResourceTrackerService(rm3.getResourceTrackerService());
rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
// application should be in running state
rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
// new attempt should not be started
Assert.assertEquals(3, rmApp.getAppAttempts().size());
// am1 and am2 attempts should be in FAILED state where as am3 should be
// in LAUNCHED state
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
.getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
ApplicationAttemptId latestAppAttemptId =
rmApp.getCurrentAppAttempt().getAppAttemptId();
Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
.get(latestAppAttemptId).getAppAttemptState());
rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
// The 4th attempt has started but is not yet saved into RMStateStore
// It will be saved only when we launch AM.
// submitting app but not starting AM for it.
RMApp app2 = rm3.submitApp(200);
rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(1, app2.getAppAttempts().size());
Assert.assertEquals(0,
memStore.getState().getApplicationState().get(app2.getApplicationId())
.getAttemptCount());
MockRM rm4 = null;
rm4 = new MockRM(conf, memStore);
rm4.start();
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
.get(latestAppAttemptId).getAppAttemptState());
// The initial application for which an AM was not started should be in
// ACCEPTED state with one application attempt started.
app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
Assert.assertEquals(1, app2.getAppAttempts().size());
Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
.getCurrentAppAttempt().getAppAttemptState());
}
@Test
public void testRMRestartFailedApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
@ -736,6 +909,8 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();

View File

@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions {
}
/**
* {@link RMAppAttemptState#RECOVERED}
* {@link RMAppAttemptState#LAUNCHED}
*/
private void testAppAttemptRecoveredState() {
assertEquals(RMAppAttemptState.RECOVERED,
assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
}