YARN-1210. Changed RM to start new app-attempts on RM restart only after ensuring that previous AM exited or after expiry time. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1543310 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1543311 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-11-19 05:17:56 +00:00
parent 0e9d89a70f
commit 8613e7ef3d
19 changed files with 569 additions and 148 deletions

View File

@ -85,6 +85,10 @@ Release 2.3.0 - UNRELEASED
YARN-709. Added tests to verify validity of delegation tokens and logging of YARN-709. Added tests to verify validity of delegation tokens and logging of
appsummary after RM restart. (Jian He via vinodkv) appsummary after RM restart. (Jian He via vinodkv)
YARN-1210. Changed RM to start new app-attempts on RM restart only after
ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
public interface NodeHeartbeatRequest { public abstract class NodeHeartbeatRequest {
NodeStatus getNodeStatus(); public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
void setNodeStatus(NodeStatus status); MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
nodeHeartbeatRequest
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
return nodeHeartbeatRequest;
}
MasterKey getLastKnownContainerTokenMasterKey(); public abstract NodeStatus getNodeStatus();
void setLastKnownContainerTokenMasterKey(MasterKey secretKey); public abstract void setNodeStatus(NodeStatus status);
MasterKey getLastKnownNMTokenMasterKey(); public abstract MasterKey getLastKnownContainerTokenMasterKey();
void setLastKnownNMTokenMasterKey(MasterKey secretKey); public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
} }

View File

@ -18,17 +18,37 @@
package org.apache.hadoop.yarn.server.api.protocolrecords; package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
public interface RegisterNodeManagerRequest { public abstract class RegisterNodeManagerRequest {
NodeId getNodeId();
int getHttpPort();
Resource getResource();
String getNMVersion();
void setNodeId(NodeId nodeId); public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
void setHttpPort(int port); int httpPort, Resource resource, String nodeManagerVersionId,
void setResource(Resource resource); List<ContainerStatus> containerStatuses) {
void setNMVersion(String version); RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
request.setResource(resource);
request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
return request;
}
public abstract NodeId getNodeId();
public abstract int getHttpPort();
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<ContainerStatus> getContainerStatuses();
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
public class NodeHeartbeatRequestPBImpl extends public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance(); NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
NodeHeartbeatRequestProto.Builder builder = null; NodeHeartbeatRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl extends
return proto; return proto;
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.nodeStatus != null) { if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));

View File

@ -19,11 +19,21 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest { public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null; RegisterNodeManagerRequestProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private Resource resource = null; private Resource resource = null;
private NodeId nodeId = null; private NodeId nodeId = null;
private List<ContainerStatus> containerStatuses = null;
public RegisterNodeManagerRequestPBImpl() { public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder(); builder = RegisterNodeManagerRequestProto.newBuilder();
@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
} }
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.containerStatuses != null) {
addContainerStatusesToProto();
}
if (this.resource != null) { if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource)); builder.setResource(convertToProtoFormat(this.resource));
} }
@ -139,6 +153,81 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
builder.setHttpPort(httpPort); builder.setHttpPort(httpPort);
} }
@Override
public List<ContainerStatus> getContainerStatuses() {
initContainerStatuses();
return containerStatuses;
}
private void initContainerStatuses() {
if (this.containerStatuses != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerStatusProto> list = p.getContainerStatusesList();
this.containerStatuses = new ArrayList<ContainerStatus>();
for (ContainerStatusProto c : list) {
this.containerStatuses.add(convertFromProtoFormat(c));
}
}
@Override
public void setContainerStatuses(List<ContainerStatus> containers) {
if (containers == null) {
return;
}
initContainerStatuses();
this.containerStatuses.addAll(containers);
}
private void addContainerStatusesToProto() {
maybeInitBuilder();
builder.clearContainerStatuses();
if (containerStatuses == null) {
return;
}
Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
@Override
public Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = containerStatuses.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainerStatuses(it);
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override @Override
public String getNMVersion() { public String getNMVersion() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeMana
return ((ResourcePBImpl)t).getProto(); return ((ResourcePBImpl)t).getProto();
} }
private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
return new ContainerStatusPBImpl(c);
}
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
} }

View File

@ -22,9 +22,23 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
public interface NodeStatus { public abstract class NodeStatus {
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List<ContainerStatus> containerStatuses,
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
nodeStatus.setContainersStatuses(containerStatuses);
nodeStatus.setKeepAliveApplications(keepAliveApplications);
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
return nodeStatus;
}
public abstract NodeId getNodeId(); public abstract NodeId getNodeId();
public abstract int getResponseId(); public abstract int getResponseId();
@ -36,8 +50,8 @@ public interface NodeStatus {
public abstract List<ApplicationId> getKeepAliveApplications(); public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds); public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
NodeHealthStatus getNodeHealthStatus(); public abstract NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus); public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract void setNodeId(NodeId nodeId); public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId); public abstract void setResponseId(int responseId);

View File

@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements public class NodeStatusPBImpl extends NodeStatus {
NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null; NodeStatusProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
@ -166,6 +164,21 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
builder.addAllKeepAliveApplications(iterable); builder.addAllKeepAliveApplications(iterable);
} }
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null)
return false;
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override @Override
public synchronized int getResponseId() { public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder; NodeStatusProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto {
optional int32 http_port = 3; optional int32 http_port = 3;
optional ResourceProto resource = 4; optional ResourceProto resource = 4;
optional string nm_version = 5; optional string nm_version = 5;
repeated ContainerStatusProto containerStatuses = 6;
} }
message RegisterNodeManagerResponseProto { message RegisterNodeManagerResponseProto {

View File

@ -26,7 +26,7 @@ public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat(); void sendOutofBandHeartBeat();
NodeStatus getNodeStatusAndUpdateContainersInContext(); NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
long getRMIdentifier(); long getRMIdentifier();

View File

@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy; import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private String nodeManagerVersionId; private String nodeManagerVersionId;
private String minimumResourceManagerVersion; private String minimumResourceManagerVersion;
private volatile boolean isStopped; private volatile boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled; private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs; private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/ /** Keeps track of when the next keep alive request should be sent for an app*/
@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
conf.getInt( conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
this.totalResource = recordFactory.newRecordInstance(Resource.class); this.totalResource = Resource.newInstance(memoryMb, virtualCores);
this.totalResource.setMemory(memoryMb);
this.totalResource.setVirtualCores(virtualCores);
metrics.addResource(totalResource); metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs = this.tokenRemovalDelayMs =
@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
@VisibleForTesting @VisibleForTesting
protected void registerWithRM() throws YarnException, IOException { protected void registerWithRM()
throws YarnException, IOException {
List<ContainerStatus> containerStatuses =
this.updateAndGetContainerStatuses();
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
request.setHttpPort(this.httpPort); nodeManagerVersionId, containerStatuses);
request.setResource(this.totalResource); if (containerStatuses != null) {
request.setNodeId(this.nodeId); LOG.info("Registering with RM using finished containers :"
request.setNMVersion(this.nodeManagerVersionId); + containerStatuses);
}
RegisterNodeManagerResponse regNMResponse = RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request); resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier(); this.rmIdentifier = regNMResponse.getRMIdentifier();
@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
@Override @Override
public NodeStatus getNodeStatusAndUpdateContainersInContext() { public NodeStatus getNodeStatusAndUpdateContainersInContext(
int responseId) {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeStatus.setNodeId(this.nodeId); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
LOG.debug(this.nodeId + " sending out status for "
+ containersStatuses.size() + " containers");
NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
int numActiveContainers = 0; return nodeStatus;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>(); }
/*
* It will return current container statuses. If any container has
* COMPLETED then it will be removed from context.
*/
private List<ContainerStatus> updateAndGetContainerStatuses() {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i = for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) { this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next(); Entry<ContainerId, Container> e = i.next();
@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Clone the container to send it to the RM // Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus(); container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus); containerStatuses.add(containerStatus);
++numActiveContainers;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Sending out status for container: " + containerStatus); LOG.debug("Sending out status for container: " + containerStatus);
} }
@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.info("Removed completed container " + containerId); LOG.info("Removed completed container " + containerId);
} }
} }
nodeStatus.setContainersStatuses(containersStatuses); return containerStatuses;
LOG.debug(this.nodeId + " sending out status for "
+ numActiveContainers + " containers");
NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
nodeHealthStatus.setLastHealthReportTime(
healthChecker.getLastHealthReportTime());
if (LOG.isDebugEnabled()) {
LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ ", " + nodeHealthStatus.getHealthReport());
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
nodeStatus.setKeepAliveApplications(keepAliveAppIds);
return nodeStatus;
} }
private void trackAppsForKeepAlive(List<ApplicationId> appIds) { private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// Send heartbeat // Send heartbeat
try { try {
NodeHeartbeatResponse response = null; NodeHeartbeatResponse response = null;
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); NodeStatus nodeStatus =
nodeStatus.setResponseId(lastHeartBeatID); getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory NodeHeartbeatRequest request =
.newRecordInstance(NodeHeartbeatRequest.class); NodeHeartbeatRequest.newInstance(nodeStatus,
request.setNodeStatus(nodeStatus); NodeStatusUpdaterImpl.this.context
request .getContainerTokenSecretManager().getCurrentKey(),
.setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getContainerTokenSecretManager().getCurrentKey()); .getCurrentKey());
request
.setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey());
response = resourceTracker.nodeHeartbeat(request); response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response //get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval(); nextHeartBeatInterval = response.getNextHeartBeatInterval();

View File

@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.SerializedException;
@ -371,17 +373,31 @@ public class ContainerManagerImpl extends CompositeService implements
this.handle(new CMgrCompletedContainersEvent(containerIds, this.handle(new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
while (!containers.isEmpty()) {
/*
* We will wait till all the containers change their state to COMPLETE. We
* will not remove the container statuses from nm context because these
* are used while re-registering node manager with resource manager.
*/
boolean allContainersCompleted = false;
while (!containers.isEmpty() && !allContainersCompleted) {
allContainersCompleted = true;
for (Entry<ContainerId, Container> container : containers.entrySet()) {
if (((ContainerImpl) container.getValue()).getCurrentState()
!= ContainerState.COMPLETE) {
allContainersCompleted = false;
try { try {
Thread.sleep(1000); Thread.sleep(1000);
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync", ex); LOG.warn("Interrupted while sleeping on container kill on resync",
ex);
}
break;
}
} }
} }
// All containers killed // All containers killed
if (containers.isEmpty()) { if (allContainersCompleted) {
LOG.info("All containers in DONE state"); LOG.info("All containers in DONE state");
} else { } else {
LOG.info("Done waiting for containers to be killed. Still alive: " + LOG.info("Done waiting for containers to be killed. Still alive: " +

View File

@ -302,7 +302,7 @@ public class ContainerImpl implements Container {
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent> private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine; stateMachine;
private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) { switch (stateMachine.getCurrentState()) {
case NEW: case NEW:
case LOCALIZING: case LOCALIZING:

View File

@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver; import org.apache.hadoop.yarn.util.RackResolver;
@ -183,6 +190,33 @@ public class ResourceTrackerService extends AbstractService implements
Resource capability = request.getResource(); Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion(); String nodeManagerVersion = request.getNMVersion();
if (!request.getContainerStatuses().isEmpty()) {
LOG.info("received container statuses on node manager register :"
+ request.getContainerStatuses());
for (ContainerStatus containerStatus : request.getContainerStatuses()) {
ApplicationAttemptId appAttemptId =
containerStatus.getContainerId().getApplicationAttemptId();
RMApp rmApp =
rmContext.getRMApps().get(appAttemptId.getApplicationId());
if (rmApp != null) {
RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
if (rmAppAttempt.getMasterContainer().getId()
.equals(containerStatus.getContainerId())
&& containerStatus.getState() == ContainerState.COMPLETE) {
// sending master container finished event.
RMAppAttemptContainerFinishedEvent evt =
new RMAppAttemptContainerFinishedEvent(appAttemptId,
containerStatus);
rmContext.getDispatcher().getEventHandler().handle(evt);
}
} else {
LOG.error("Received finished container :"
+ containerStatus.getContainerId()
+ " for non existing application :"
+ appAttemptId.getApplicationId());
}
}
}
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);

View File

@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp, Recoverable {
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition()) RMAppEventType.START, new RMAppSavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.FINAL_SAVING), RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition()) RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
new FinalSavingTransition( new FinalSavingTransition(
@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp, Recoverable {
this.diagnostics.append(appState.getDiagnostics()); this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime(); this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime(); this.startTime = appState.getStartTime();
for(int i=0; i<appState.getAttemptCount(); ++i) { for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt // create attempt
createNewAttempt(false); createNewAttempt(false);
// recover attempt ((RMAppAttemptImpl)this.currentAttempt).recover(state);
((RMAppAttemptImpl) currentAttempt).recover(state);
} }
} }
@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp, Recoverable {
}; };
} }
@SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> { MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
@Override @Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) { public RMAppState transition(RMAppImpl app, RMAppEvent event) {
if (app.attempts.isEmpty()) {
// Saved application was not running any attempts.
app.createNewAttempt(true);
return RMAppState.SUBMITTED;
} else {
/*
* If last attempt recovered final state is null .. it means attempt
* was started but AM container may or may not have started / finished.
* Therefore we should wait for it to finish.
*/
for (RMAppAttempt attempt : app.getAppAttempts().values()) {
app.dispatcher.getEventHandler().handle(
new RMAppAttemptEvent(attempt.getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
}
if (app.recoveredFinalState != null) { if (app.recoveredFinalState != null) {
FINAL_TRANSITION.transition(app, event); FINAL_TRANSITION.transition(app, event);
return app.recoveredFinalState; return app.recoveredFinalState;
} else {
return RMAppState.RUNNING;
}
} }
// Directly call AttemptFailedTransition, since now we deem that an
// application fails because of RM restart as a normal AM failure.
// Do not recover unmanaged applications since current recovery
// mechanism of restarting attempts does not work for them.
// This will need to be changed in work preserving recovery in which
// RM will re-connect with the running AM's instead of restarting them
// In work-preserve restart, if attemptCount == maxAttempts, the job still
// needs to be recovered because the last attempt may still be running.
// As part of YARN-1210, we may return ACCECPTED state waiting for AM to
// reregister or fail and remove the following code.
return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
event);
} }
} }
@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp, Recoverable {
throw new YarnRuntimeException("Unknown state passed!"); throw new YarnRuntimeException("Unknown state passed!");
} }
} }
public static boolean isAppInFinalState(RMApp rmApp) {
RMAppState appState = rmApp.getState();
return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
|| appState == RMAppState.KILLED;
}
} }

View File

@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
.addTransition( RMAppAttemptState.NEW, .addTransition( RMAppAttemptState.NEW,
EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED), RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state // Transitions from SUBMITTED state
@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL, RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE)) RMAppAttemptEventType.STATUS_UPDATE))
// Transitions from RECOVERED State
.addTransition(
RMAppAttemptState.RECOVERED,
RMAppAttemptState.RECOVERED,
EnumSet.of(RMAppAttemptEventType.START,
RMAppAttemptEventType.APP_ACCEPTED,
RMAppAttemptEventType.APP_REJECTED,
RMAppAttemptEventType.EXPIRE,
RMAppAttemptEventType.LAUNCHED,
RMAppAttemptEventType.LAUNCH_FAILED,
RMAppAttemptEventType.REGISTERED,
RMAppAttemptEventType.CONTAINER_ALLOCATED,
RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.CONTAINER_FINISHED,
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
.installTopology(); .installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus(); this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime(); this.startTime = attemptState.getStartTime();
handle(new RMAppAttemptEvent(getAppAttemptId(),
RMAppAttemptEventType.RECOVER));
} }
private void recoverAppAttemptCredentials(Credentials appAttemptTokens) private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override @Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) { RMAppAttemptEvent event) {
LOG.info("Recovering attempt : recoverdFinalState :"
+ appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) { if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f; appAttempt.progress = 1.0f;
RMApp rmApp =appAttempt.rmContext.getRMApps().get(
appAttempt.getAppAttemptId().getApplicationId());
// We will replay the final attempt only if last attempt is in final
// state but application is not in final state.
if (rmApp.getCurrentAppAttempt() == appAttempt
&& !RMAppImpl.isAppInFinalState(rmApp)) {
(new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
appAttempt, event);
}
return appAttempt.recoveredFinalState; return appAttempt.recoveredFinalState;
} else { } else {
return RMAppAttemptState.RECOVERED; /*
* Since the application attempt's final state is not saved that means
* for AM container (previous attempt) state must be one of these.
* 1) AM container may not have been launched (RM failed right before
* this).
* 2) AM container was successfully launched but may or may not have
* registered / unregistered.
* In whichever case we will wait (by moving attempt into LAUNCHED
* state) and mark this attempt failed (assuming non work preserving
* restart) only after
* 1) Node manager during re-registration heart beats back saying
* am container finished.
* 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
* heart beat back).
*/
(new AMLaunchedTransition()).transition(appAttempt, event);
return RMAppAttemptState.LAUNCHED;
} }
} }
} }

View File

@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
public enum RMAppAttemptState { public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED, FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
FINAL_SAVING
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@ -98,12 +100,18 @@ public class MockNM {
} }
public RegisterNodeManagerResponse registerNode() throws Exception { public RegisterNodeManagerResponse registerNode() throws Exception {
return registerNode(null);
}
public RegisterNodeManagerResponse registerNode(
List<ContainerStatus> containerStatus) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class); RegisterNodeManagerRequest.class);
req.setNodeId(nodeId); req.setNodeId(nodeId);
req.setHttpPort(httpPort); req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores); Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource); req.setResource(resource);
req.setContainerStatuses(containerStatus);
req.setNMVersion(version); req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse = RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req); resourceTracker.registerNodeManager(req);

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@ -62,10 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -88,6 +93,7 @@ import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mortbay.log.Log;
public class TestRMRestart { public class TestRMRestart {
@ -109,6 +115,7 @@ public class TestRMRestart {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
} }
@SuppressWarnings("rawtypes")
@Test (timeout=180000) @Test (timeout=180000)
public void testRMRestart() throws Exception { public void testRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@ -257,11 +264,14 @@ public class TestRMRestart {
.getApplicationId()); .getApplicationId());
// verify state machine kicked into expected states // verify state machine kicked into expected states
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED); rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
// verify new attempts created // verify attempts for apps
Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); // The app for which AM was started will wait for previous am
// container finish event to arrive. However for an application for which
// no am container was running will start new application attempt.
Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(1, loadedApp2.getAppAttempts().size()); Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
// verify old AM is not accepted // verify old AM is not accepted
@ -279,8 +289,20 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register // new NM to represent NM re-register
nm1 = rm2.registerNode("127.0.0.1:1234", 15120); nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
nm2 = rm2.registerNode("127.0.0.2:5678", 15120); nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
.getCurrentAppAttempt().getAppAttemptId(), 1),
ContainerState.COMPLETE, "Killed AM container", 143);
containerStatuses.add(containerStatus);
nm1.registerNode(containerStatuses);
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify no more reboot response sent // verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true); hbResponse = nm1.nodeHeartbeat(true);
@ -403,6 +425,157 @@ public class TestRMRestart {
.getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState());
} }
@Test
public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
// testing 3 cases
// After RM restarts
// 1) New application attempt is not started until previous AM container
// finish event is reported back to RM as a part of nm registration.
// 2) If previous AM container finish event is never reported back (i.e.
// node manager on which this AM container was running also went down) in
// that case AMLivenessMonitor should time out previous attempt and start
// new attempt.
// 3) If all the stored attempts had finished then new attempt should
// be started immediately.
YarnConfiguration conf = new YarnConfiguration(this.conf);
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationState> rmAppState =
rmState.getApplicationState();
// start RM
final MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
nm1.registerNode();
// submitting app
RMApp app1 = rm1.submitApp(200);
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
MockAM am1 = launchAM(app1, rm1, nm1);
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
// Fail first AM.
am1.waitForState(RMAppAttemptState.FAILED);
// launch another AM.
MockAM am2 = launchAM(app1, rm1, nm1);
Assert.assertEquals(1, rmAppState.size());
Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
Assert.assertEquals(app1.getAppAttempts()
.get(app1.getCurrentAppAttempt().getAppAttemptId())
.getAppAttemptState(), RMAppAttemptState.RUNNING);
// start new RM.
MockRM rm2 = null;
rm2 = new MockRM(conf, memStore);
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
// application should be in running state
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
// new attempt should not be started
Assert.assertEquals(2, rmApp.getAppAttempts().size());
// am1 attempt should be in FAILED state where as am2 attempt should be in
// LAUNCHED state
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
.getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.LAUNCHED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
ContainerStatus containerStatus =
BuilderUtils.newContainerStatus(
BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
ContainerState.COMPLETE, "Killed AM container", 143);
containerStatuses.add(containerStatus);
nm1.registerNode(containerStatuses);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.RUNNING);
// Now restart RM ...
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
MockRM rm3 = null;
rm3 = new MockRM(conf, memStore);
rm3.start();
// Wait for RM to process all the events as a part of rm recovery.
nm1.setResourceTrackerService(rm3.getResourceTrackerService());
rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
// application should be in running state
rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
// new attempt should not be started
Assert.assertEquals(3, rmApp.getAppAttempts().size());
// am1 and am2 attempts should be in FAILED state where as am3 should be
// in LAUNCHED state
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
.getAppAttemptState());
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
.getAppAttemptState());
ApplicationAttemptId latestAppAttemptId =
rmApp.getCurrentAppAttempt().getAppAttemptId();
Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
.get(latestAppAttemptId).getAppAttemptState());
rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppAttemptState.FAILED,
rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
// The 4th attempt has started but is not yet saved into RMStateStore
// It will be saved only when we launch AM.
// submitting app but not starting AM for it.
RMApp app2 = rm3.submitApp(200);
rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(1, app2.getAppAttempts().size());
Assert.assertEquals(0,
memStore.getState().getApplicationState().get(app2.getApplicationId())
.getAttemptCount());
MockRM rm4 = null;
rm4 = new MockRM(conf, memStore);
rm4.start();
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(4, rmApp.getAppAttempts().size());
Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
.get(latestAppAttemptId).getAppAttemptState());
// The initial application for which an AM was not started should be in
// ACCEPTED state with one application attempt started.
app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
Assert.assertEquals(1, app2.getAppAttempts().size());
Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
.getCurrentAppAttempt().getAppAttemptState());
}
@Test @Test
public void testRMRestartFailedApp() throws Exception { public void testRMRestartFailedApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
@ -736,6 +909,8 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId()); attemptState.getMasterContainer().getId());
// Setting AMLivelinessMonitor interval to be 10 Secs.
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
// start new RM // start new RM
MockRM rm2 = new MockRM(conf, memStore); MockRM rm2 = new MockRM(conf, memStore);
rm2.start(); rm2.start();

View File

@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions {
} }
/** /**
* {@link RMAppAttemptState#RECOVERED} * {@link RMAppAttemptState#LAUNCHED}
*/ */
private void testAppAttemptRecoveredState() { private void testAppAttemptRecoveredState() {
assertEquals(RMAppAttemptState.RECOVERED, assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState()); applicationAttempt.getAppAttemptState());
} }