From cfa783141fa69c2cf154d1d9e5393353d14ce5e1 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 19 Nov 2013 05:17:20 +0000 Subject: [PATCH] YARN-1210. Changed RM to start new app-attempts on RM restart only after ensuring that previous AM exited or after expiry time. Contributed by Omkar Vinit Joshi. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543310 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 4 + .../protocolrecords/NodeHeartbeatRequest.java | 32 ++- .../RegisterNodeManagerRequest.java | 38 +++- .../impl/pb/NodeHeartbeatRequestPBImpl.java | 19 +- .../pb/RegisterNodeManagerRequestPBImpl.java | 102 +++++++++- .../yarn/server/api/records/NodeStatus.java | 20 +- .../api/records/impl/pb/NodeStatusPBImpl.java | 19 +- .../yarn_server_common_service_protos.proto | 1 + .../server/nodemanager/NodeStatusUpdater.java | 2 +- .../nodemanager/NodeStatusUpdaterImpl.java | 96 +++++---- .../ContainerManagerImpl.java | 32 ++- .../container/ContainerImpl.java | 2 +- .../ResourceTrackerService.java | 36 +++- .../resourcemanager/rmapp/RMAppImpl.java | 55 +++--- .../rmapp/attempt/RMAppAttemptImpl.java | 55 +++--- .../rmapp/attempt/RMAppAttemptState.java | 3 +- .../yarn/server/resourcemanager/MockNM.java | 12 +- .../server/resourcemanager/TestRMRestart.java | 185 +++++++++++++++++- .../attempt/TestRMAppAttemptTransitions.java | 4 +- 19 files changed, 569 insertions(+), 148 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 78028ad89ba..20acce235b2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -103,6 +103,10 @@ Release 2.3.0 - UNRELEASED YARN-709. Added tests to verify validity of delegation tokens and logging of appsummary after RM restart. (Jian He via vinodkv) + YARN-1210. Changed RM to start new app-attempts on RM restart only after + ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via + vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java index e0bda926333..addd3fe6815 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java @@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.util.Records; -public interface NodeHeartbeatRequest { - - NodeStatus getNodeStatus(); - void setNodeStatus(NodeStatus status); - - MasterKey getLastKnownContainerTokenMasterKey(); - void setLastKnownContainerTokenMasterKey(MasterKey secretKey); +public abstract class NodeHeartbeatRequest { - MasterKey getLastKnownNMTokenMasterKey(); - void setLastKnownNMTokenMasterKey(MasterKey secretKey); + public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, + MasterKey lastKnownContainerTokenMasterKey, + MasterKey lastKnownNMTokenMasterKey) { + NodeHeartbeatRequest nodeHeartbeatRequest = + Records.newRecord(NodeHeartbeatRequest.class); + nodeHeartbeatRequest.setNodeStatus(nodeStatus); + nodeHeartbeatRequest + .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); + nodeHeartbeatRequest + .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); + return nodeHeartbeatRequest; + } + + public abstract NodeStatus getNodeStatus(); + public abstract void setNodeStatus(NodeStatus status); + + public abstract MasterKey getLastKnownContainerTokenMasterKey(); + public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey); + + public abstract MasterKey getLastKnownNMTokenMasterKey(); + public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java index 32f44a475ed..6ca38615f7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java @@ -18,17 +18,37 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.Records; -public interface RegisterNodeManagerRequest { - NodeId getNodeId(); - int getHttpPort(); - Resource getResource(); - String getNMVersion(); +public abstract class RegisterNodeManagerRequest { - void setNodeId(NodeId nodeId); - void setHttpPort(int port); - void setResource(Resource resource); - void setNMVersion(String version); + public static RegisterNodeManagerRequest newInstance(NodeId nodeId, + int httpPort, Resource resource, String nodeManagerVersionId, + List containerStatuses) { + RegisterNodeManagerRequest request = + Records.newRecord(RegisterNodeManagerRequest.class); + request.setHttpPort(httpPort); + request.setResource(resource); + request.setNodeId(nodeId); + request.setNMVersion(nodeManagerVersionId); + request.setContainerStatuses(containerStatuses); + return request; + } + + public abstract NodeId getNodeId(); + public abstract int getHttpPort(); + public abstract Resource getResource(); + public abstract String getNMVersion(); + public abstract List getContainerStatuses(); + + public abstract void setNodeId(NodeId nodeId); + public abstract void setHttpPort(int port); + public abstract void setResource(Resource resource); + public abstract void setNMVersion(String version); + public abstract void setContainerStatuses(List containerStatuses); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java index 0dc7223e6bf..26d1f190de3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; @@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl; -public class NodeHeartbeatRequestPBImpl extends - ProtoBase implements NodeHeartbeatRequest { +public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest { NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance(); NodeHeartbeatRequestProto.Builder builder = null; boolean viaProto = false; @@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl extends return proto; } + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + private void mergeLocalToBuilder() { if (this.nodeStatus != null) { builder.setNodeStatus(convertToProtoFormat(this.nodeStatus)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java index b81a5900841..6f9c43dd98c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java @@ -19,11 +19,21 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; @@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ -public class RegisterNodeManagerRequestPBImpl extends ProtoBase implements RegisterNodeManagerRequest { +public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto.Builder builder = null; boolean viaProto = false; private Resource resource = null; private NodeId nodeId = null; + private List containerStatuses = null; public RegisterNodeManagerRequestPBImpl() { builder = RegisterNodeManagerRequestProto.newBuilder(); @@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase getContainerStatuses() { + initContainerStatuses(); + return containerStatuses; + } + + private void initContainerStatuses() { + if (this.containerStatuses != null) { + return; + } + RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainerStatusesList(); + this.containerStatuses = new ArrayList(); + for (ContainerStatusProto c : list) { + this.containerStatuses.add(convertFromProtoFormat(c)); + } + } + + @Override + public void setContainerStatuses(List containers) { + if (containers == null) { + return; + } + initContainerStatuses(); + this.containerStatuses.addAll(containers); + } + + private void addContainerStatusesToProto() { + maybeInitBuilder(); + builder.clearContainerStatuses(); + if (containerStatuses == null) { + return; + } + Iterable it = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containerStatuses.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainerStatuses(it); + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + @Override public String getNMVersion() { RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestPBImpl extends ProtoBase containerStatuses, + List keepAliveApplications, + NodeHealthStatus nodeHealthStatus) { + NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); + nodeStatus.setResponseId(responseId); + nodeStatus.setNodeId(nodeId); + nodeStatus.setContainersStatuses(containerStatuses); + nodeStatus.setKeepAliveApplications(keepAliveApplications); + nodeStatus.setNodeHealthStatus(nodeHealthStatus); + return nodeStatus; + } + public abstract NodeId getNodeId(); public abstract int getResponseId(); @@ -36,8 +50,8 @@ public interface NodeStatus { public abstract List getKeepAliveApplications(); public abstract void setKeepAliveApplications(List appIds); - NodeHealthStatus getNodeHealthStatus(); - void setNodeHealthStatus(NodeHealthStatus healthStatus); + public abstract NodeHealthStatus getNodeHealthStatus(); + public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus); public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8ed7849a128..65376dc659e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; @@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -public class NodeStatusPBImpl extends ProtoBase implements - NodeStatus { +public class NodeStatusPBImpl extends NodeStatus { NodeStatusProto proto = NodeStatusProto.getDefaultInstance(); NodeStatusProto.Builder builder = null; boolean viaProto = false; @@ -166,6 +164,21 @@ public class NodeStatusPBImpl extends ProtoBase implements builder.addAllKeepAliveApplications(iterable); } + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + @Override public synchronized int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index 70434c895b3..c54490526df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto { optional int32 http_port = 3; optional ResourceProto resource = 4; optional string nm_version = 5; + repeated ContainerStatusProto containerStatuses = 6; } message RegisterNodeManagerResponseProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 6ac71b4bd52..69ac848db4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -26,7 +26,7 @@ public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); - NodeStatus getNodeStatusAndUpdateContainersInContext(); + NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId); long getRMIdentifier(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8a06418846c..aaf6ceb02af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private String nodeManagerVersionId; private String minimumResourceManagerVersion; private volatile boolean isStopped; - private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private boolean tokenKeepAliveEnabled; private long tokenRemovalDelayMs; /** Keeps track of when the next keep alive request should be sent for an app*/ @@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements conf.getInt( YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES); - this.totalResource = recordFactory.newRecordInstance(Resource.class); - this.totalResource.setMemory(memoryMb); - this.totalResource.setVirtualCores(virtualCores); + this.totalResource = Resource.newInstance(memoryMb, virtualCores); metrics.addResource(totalResource); this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf); this.tokenRemovalDelayMs = @@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } @VisibleForTesting - protected void registerWithRM() throws YarnException, IOException { + protected void registerWithRM() + throws YarnException, IOException { + List containerStatuses = + this.updateAndGetContainerStatuses(); RegisterNodeManagerRequest request = - recordFactory.newRecordInstance(RegisterNodeManagerRequest.class); - request.setHttpPort(this.httpPort); - request.setResource(this.totalResource); - request.setNodeId(this.nodeId); - request.setNMVersion(this.nodeManagerVersionId); + RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, + nodeManagerVersionId, containerStatuses); + if (containerStatuses != null) { + LOG.info("Registering with RM using finished containers :" + + containerStatuses); + } RegisterNodeManagerResponse regNMResponse = resourceTracker.registerNodeManager(request); this.rmIdentifier = regNMResponse.getRMIdentifier(); @@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } @Override - public NodeStatus getNodeStatusAndUpdateContainersInContext() { + public NodeStatus getNodeStatusAndUpdateContainersInContext( + int responseId) { - NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); - nodeStatus.setNodeId(this.nodeId); + NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); + nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); + nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); + nodeHealthStatus.setLastHealthReportTime( + healthChecker.getLastHealthReportTime()); + if (LOG.isDebugEnabled()) { + LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() + + ", " + nodeHealthStatus.getHealthReport()); + } + List containersStatuses = updateAndGetContainerStatuses(); + LOG.debug(this.nodeId + " sending out status for " + + containersStatuses.size() + " containers"); + NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, + containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus); - int numActiveContainers = 0; - List containersStatuses = new ArrayList(); + return nodeStatus; + } + + /* + * It will return current container statuses. If any container has + * COMPLETED then it will be removed from context. + */ + private List updateAndGetContainerStatuses() { + List containerStatuses = new ArrayList(); for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Clone the container to send it to the RM org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); - containersStatuses.add(containerStatus); - ++numActiveContainers; + containerStatuses.add(containerStatus); if (LOG.isDebugEnabled()) { LOG.debug("Sending out status for container: " + containerStatus); } @@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Removed completed container " + containerId); } } - nodeStatus.setContainersStatuses(containersStatuses); - - LOG.debug(this.nodeId + " sending out status for " - + numActiveContainers + " containers"); - - NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); - nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); - nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); - nodeHealthStatus.setLastHealthReportTime( - healthChecker.getLastHealthReportTime()); - if (LOG.isDebugEnabled()) { - LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() - + ", " + nodeHealthStatus.getHealthReport()); - } - nodeStatus.setNodeHealthStatus(nodeHealthStatus); - - List keepAliveAppIds = createKeepAliveApplicationList(); - nodeStatus.setKeepAliveApplications(keepAliveAppIds); - - return nodeStatus; + return containerStatuses; } private void trackAppsForKeepAlive(List appIds) { @@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Send heartbeat try { NodeHeartbeatResponse response = null; - NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); - nodeStatus.setResponseId(lastHeartBeatID); + NodeStatus nodeStatus = + getNodeStatusAndUpdateContainersInContext(lastHeartBeatID); - NodeHeartbeatRequest request = recordFactory - .newRecordInstance(NodeHeartbeatRequest.class); - request.setNodeStatus(nodeStatus); - request - .setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context - .getContainerTokenSecretManager().getCurrentKey()); - request - .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context - .getNMTokenSecretManager().getCurrentKey()); + NodeHeartbeatRequest request = + NodeHeartbeatRequest.newInstance(nodeStatus, + NodeStatusUpdaterImpl.this.context + .getContainerTokenSecretManager().getCurrentKey(), + NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager() + .getCurrentKey()); response = resourceTracker.nodeHeartbeat(request); //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 3091c4adb6f..dd3deb3c56d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.SerializedException; @@ -371,17 +373,31 @@ public class ContainerManagerImpl extends CompositeService implements this.handle(new CMgrCompletedContainersEvent(containerIds, CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC)); - while (!containers.isEmpty()) { - try { - Thread.sleep(1000); - nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill on resync", ex); + + /* + * We will wait till all the containers change their state to COMPLETE. We + * will not remove the container statuses from nm context because these + * are used while re-registering node manager with resource manager. + */ + boolean allContainersCompleted = false; + while (!containers.isEmpty() && !allContainersCompleted) { + allContainersCompleted = true; + for (Entry container : containers.entrySet()) { + if (((ContainerImpl) container.getValue()).getCurrentState() + != ContainerState.COMPLETE) { + allContainersCompleted = false; + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", + ex); + } + break; + } } } - // All containers killed - if (containers.isEmpty()) { + if (allContainersCompleted) { LOG.info("All containers in DONE state"); } else { LOG.info("Done waiting for containers to be killed. Still alive: " + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index c2d32b57bee..34c0cc66dd6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -302,7 +302,7 @@ public class ContainerImpl implements Container { private final StateMachine stateMachine; - private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { + public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() { switch (stateMachine.getCurrentState()) { case NEW: case LOCALIZING: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 23f87549ce3..f80ce85d9d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; -import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.util.RackResolver; @@ -183,6 +190,33 @@ public class ResourceTrackerService extends AbstractService implements Resource capability = request.getResource(); String nodeManagerVersion = request.getNMVersion(); + if (!request.getContainerStatuses().isEmpty()) { + LOG.info("received container statuses on node manager register :" + + request.getContainerStatuses()); + for (ContainerStatus containerStatus : request.getContainerStatuses()) { + ApplicationAttemptId appAttemptId = + containerStatus.getContainerId().getApplicationAttemptId(); + RMApp rmApp = + rmContext.getRMApps().get(appAttemptId.getApplicationId()); + if (rmApp != null) { + RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId); + if (rmAppAttempt.getMasterContainer().getId() + .equals(containerStatus.getContainerId()) + && containerStatus.getState() == ContainerState.COMPLETE) { + // sending master container finished event. + RMAppAttemptContainerFinishedEvent evt = + new RMAppAttemptContainerFinishedEvent(appAttemptId, + containerStatus); + rmContext.getDispatcher().getEventHandler().handle(evt); + } + } else { + LOG.error("Received finished container :" + + containerStatus.getContainerId() + + " for non existing application :" + + appAttemptId.getApplicationId()); + } + } + } RegisterNodeManagerResponse response = recordFactory .newRecordInstance(RegisterNodeManagerResponse.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index e3b083ca191..16868369a73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp, Recoverable { .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, RMAppEventType.START, new RMAppSavingTransition()) .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, - RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED, - RMAppState.FINAL_SAVING), + RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING), RMAppEventType.RECOVER, new RMAppRecoveredTransition()) .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL, new FinalSavingTransition( @@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp, Recoverable { this.diagnostics.append(appState.getDiagnostics()); this.storedFinishTime = appState.getFinishTime(); this.startTime = appState.getStartTime(); + for(int i=0; i { + @Override public RMAppState transition(RMAppImpl app, RMAppEvent event) { - if (app.recoveredFinalState != null) { - FINAL_TRANSITION.transition(app, event); - return app.recoveredFinalState; + if (app.attempts.isEmpty()) { + // Saved application was not running any attempts. + app.createNewAttempt(true); + return RMAppState.SUBMITTED; + } else { + /* + * If last attempt recovered final state is null .. it means attempt + * was started but AM container may or may not have started / finished. + * Therefore we should wait for it to finish. + */ + for (RMAppAttempt attempt : app.getAppAttempts().values()) { + app.dispatcher.getEventHandler().handle( + new RMAppAttemptEvent(attempt.getAppAttemptId(), + RMAppAttemptEventType.RECOVER)); + } + if (app.recoveredFinalState != null) { + FINAL_TRANSITION.transition(app, event); + return app.recoveredFinalState; + } else { + return RMAppState.RUNNING; + } } - // Directly call AttemptFailedTransition, since now we deem that an - // application fails because of RM restart as a normal AM failure. - - // Do not recover unmanaged applications since current recovery - // mechanism of restarting attempts does not work for them. - // This will need to be changed in work preserving recovery in which - // RM will re-connect with the running AM's instead of restarting them - - // In work-preserve restart, if attemptCount == maxAttempts, the job still - // needs to be recovered because the last attempt may still be running. - - // As part of YARN-1210, we may return ACCECPTED state waiting for AM to - // reregister or fail and remove the following code. - return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app, - event); } } @@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp, Recoverable { throw new YarnRuntimeException("Unknown state passed!"); } } + + public static boolean isAppInFinalState(RMApp rmApp) { + RMAppState appState = rmApp.getState(); + return appState == RMAppState.FAILED || appState == RMAppState.FINISHED + || appState == RMAppState.KILLED; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 1247bb77d45..dc15e7af4e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.Appli import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent; @@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED)) .addTransition( RMAppAttemptState.NEW, EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED, - RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED), + RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED), RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition()) // Transitions from SUBMITTED state @@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { RMAppAttemptEventType.UNREGISTERED, RMAppAttemptEventType.KILL, RMAppAttemptEventType.STATUS_UPDATE)) - - // Transitions from RECOVERED State - .addTransition( - RMAppAttemptState.RECOVERED, - RMAppAttemptState.RECOVERED, - EnumSet.of(RMAppAttemptEventType.START, - RMAppAttemptEventType.APP_ACCEPTED, - RMAppAttemptEventType.APP_REJECTED, - RMAppAttemptEventType.EXPIRE, - RMAppAttemptEventType.LAUNCHED, - RMAppAttemptEventType.LAUNCH_FAILED, - RMAppAttemptEventType.REGISTERED, - RMAppAttemptEventType.CONTAINER_ALLOCATED, - RMAppAttemptEventType.CONTAINER_ACQUIRED, - RMAppAttemptEventType.ATTEMPT_NEW_SAVED, - RMAppAttemptEventType.CONTAINER_FINISHED, - RMAppAttemptEventType.UNREGISTERED, - RMAppAttemptEventType.KILL, - RMAppAttemptEventType.STATUS_UPDATE)) .installTopology(); public RMAppAttemptImpl(ApplicationAttemptId appAttemptId, @@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl); this.finalStatus = attemptState.getFinalApplicationStatus(); this.startTime = attemptState.getStartTime(); - handle(new RMAppAttemptEvent(getAppAttemptId(), - RMAppAttemptEventType.RECOVER)); } private void recoverAppAttemptCredentials(Credentials appAttemptTokens) @@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { @Override public RMAppAttemptState transition(RMAppAttemptImpl appAttempt, RMAppAttemptEvent event) { + LOG.info("Recovering attempt : recoverdFinalState :" + + appAttempt.recoveredFinalState); if (appAttempt.recoveredFinalState != null) { appAttempt.progress = 1.0f; + RMApp rmApp =appAttempt.rmContext.getRMApps().get( + appAttempt.getAppAttemptId().getApplicationId()); + // We will replay the final attempt only if last attempt is in final + // state but application is not in final state. + if (rmApp.getCurrentAppAttempt() == appAttempt + && !RMAppImpl.isAppInFinalState(rmApp)) { + (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition( + appAttempt, event); + } return appAttempt.recoveredFinalState; } else { - return RMAppAttemptState.RECOVERED; + /* + * Since the application attempt's final state is not saved that means + * for AM container (previous attempt) state must be one of these. + * 1) AM container may not have been launched (RM failed right before + * this). + * 2) AM container was successfully launched but may or may not have + * registered / unregistered. + * In whichever case we will wait (by moving attempt into LAUNCHED + * state) and mark this attempt failed (assuming non work preserving + * restart) only after + * 1) Node manager during re-registration heart beats back saying + * am container finished. + * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't + * heart beat back). + */ + (new AMLaunchedTransition()).transition(appAttempt, event); + return RMAppAttemptState.LAUNCHED; } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java index 2551ed111d4..14d20bb21af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java @@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt; public enum RMAppAttemptState { NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING, - FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED, - FINAL_SAVING + FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index c2cf147d292..eb691624ec0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; @@ -98,21 +100,27 @@ public class MockNM { } public RegisterNodeManagerResponse registerNode() throws Exception { + return registerNode(null); + } + + public RegisterNodeManagerResponse registerNode( + List containerStatus) throws Exception{ RegisterNodeManagerRequest req = Records.newRecord( RegisterNodeManagerRequest.class); req.setNodeId(nodeId); req.setHttpPort(httpPort); Resource resource = BuilderUtils.newResource(memory, vCores); req.setResource(resource); + req.setContainerStatuses(containerStatus); req.setNMVersion(version); RegisterNodeManagerResponse registrationResponse = resourceTracker.registerNodeManager(req); this.currentContainerTokenMasterKey = registrationResponse.getContainerTokenMasterKey(); this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey(); - return registrationResponse; + return registrationResponse; } - + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(new HashMap>(), isHealthy, ++responseId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index b1d1e19a898..43de3b8f390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -34,6 +34,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; @@ -62,10 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -88,6 +93,7 @@ import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mortbay.log.Log; public class TestRMRestart { @@ -109,6 +115,7 @@ public class TestRMRestart { Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1); } + @SuppressWarnings("rawtypes") @Test (timeout=180000) public void testRMRestart() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, @@ -257,11 +264,14 @@ public class TestRMRestart { .getApplicationId()); // verify state machine kicked into expected states - rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING); rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED); - // verify new attempts created - Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); + // verify attempts for apps + // The app for which AM was started will wait for previous am + // container finish event to arrive. However for an application for which + // no am container was running will start new application attempt. + Assert.assertEquals(1, loadedApp1.getAppAttempts().size()); Assert.assertEquals(1, loadedApp2.getAppAttempts().size()); // verify old AM is not accepted @@ -279,8 +289,20 @@ public class TestRMRestart { Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register - nm1 = rm2.registerNode("127.0.0.1:1234", 15120); - nm2 = rm2.registerNode("127.0.0.2:5678", 15120); + nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService()); + nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService()); + + List containerStatuses = new ArrayList(); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1 + .getCurrentAppAttempt().getAppAttemptId(), 1), + ContainerState.COMPLETE, "Killed AM container", 143); + containerStatuses.add(containerStatus); + nm1.registerNode(containerStatuses); + nm2.registerNode(); + + rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(2, loadedApp1.getAppAttempts().size()); // verify no more reboot response sent hbResponse = nm1.nodeHeartbeat(true); @@ -403,6 +425,157 @@ public class TestRMRestart { .getAppAttempts().get(am0.getApplicationAttemptId()).getAppAttemptState()); } + @Test + public void testRMRestartWaitForPreviousAMToFinish() throws Exception { + // testing 3 cases + // After RM restarts + // 1) New application attempt is not started until previous AM container + // finish event is reported back to RM as a part of nm registration. + // 2) If previous AM container finish event is never reported back (i.e. + // node manager on which this AM container was running also went down) in + // that case AMLivenessMonitor should time out previous attempt and start + // new attempt. + // 3) If all the stored attempts had finished then new attempt should + // be started immediately. + YarnConfiguration conf = new YarnConfiguration(this.conf); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40); + MemoryRMStateStore memStore = new MemoryRMStateStore(); + memStore.init(conf); + RMState rmState = memStore.getState(); + Map rmAppState = + rmState.getApplicationState(); + + // start RM + final MockRM rm1 = new MockRM(conf, memStore); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // submitting app + RMApp app1 = rm1.submitApp(200); + rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); + MockAM am1 = launchAM(app1, rm1, nm1); + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + // Fail first AM. + am1.waitForState(RMAppAttemptState.FAILED); + + // launch another AM. + MockAM am2 = launchAM(app1, rm1, nm1); + + Assert.assertEquals(1, rmAppState.size()); + Assert.assertEquals(app1.getState(), RMAppState.RUNNING); + Assert.assertEquals(app1.getAppAttempts() + .get(app1.getCurrentAppAttempt().getAppAttemptId()) + .getAppAttemptState(), RMAppAttemptState.RUNNING); + + // start new RM. + MockRM rm2 = null; + rm2 = new MockRM(conf, memStore); + rm2.start(); + + nm1.setResourceTrackerService(rm2.getResourceTrackerService()); + NodeHeartbeatResponse res = nm1.nodeHeartbeat(true); + Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction()); + + RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId()); + // application should be in running state + rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + + Assert.assertEquals(RMAppState.RUNNING, rmApp.getState()); + // new attempt should not be started + Assert.assertEquals(2, rmApp.getAppAttempts().size()); + // am1 attempt should be in FAILED state where as am2 attempt should be in + // LAUNCHED state + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am1.getApplicationAttemptId()) + .getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.LAUNCHED, + rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) + .getAppAttemptState()); + + List containerStatuses = new ArrayList(); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus( + BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1), + ContainerState.COMPLETE, "Killed AM container", 143); + containerStatuses.add(containerStatus); + nm1.registerNode(containerStatuses); + rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED); + launchAM(rmApp, rm2, nm1); + Assert.assertEquals(3, rmApp.getAppAttempts().size()); + rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.RUNNING); + // Now restart RM ... + // Setting AMLivelinessMonitor interval to be 10 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); + MockRM rm3 = null; + rm3 = new MockRM(conf, memStore); + rm3.start(); + + // Wait for RM to process all the events as a part of rm recovery. + nm1.setResourceTrackerService(rm3.getResourceTrackerService()); + + rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId()); + // application should be in running state + rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING); + Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING); + // new attempt should not be started + Assert.assertEquals(3, rmApp.getAppAttempts().size()); + // am1 and am2 attempts should be in FAILED state where as am3 should be + // in LAUNCHED state + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am1.getApplicationAttemptId()) + .getAppAttemptState()); + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(am2.getApplicationAttemptId()) + .getAppAttemptState()); + ApplicationAttemptId latestAppAttemptId = + rmApp.getCurrentAppAttempt().getAppAttemptId(); + Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts() + .get(latestAppAttemptId).getAppAttemptState()); + + rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED); + rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(4, rmApp.getAppAttempts().size()); + Assert.assertEquals(RMAppAttemptState.FAILED, + rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState()); + + latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId(); + + // The 4th attempt has started but is not yet saved into RMStateStore + // It will be saved only when we launch AM. + + // submitting app but not starting AM for it. + RMApp app2 = rm3.submitApp(200); + rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(1, app2.getAppAttempts().size()); + Assert.assertEquals(0, + memStore.getState().getApplicationState().get(app2.getApplicationId()) + .getAttemptCount()); + + MockRM rm4 = null; + rm4 = new MockRM(conf, memStore); + rm4.start(); + + rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId()); + rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(4, rmApp.getAppAttempts().size()); + Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState()); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts() + .get(latestAppAttemptId).getAppAttemptState()); + + // The initial application for which an AM was not started should be in + // ACCEPTED state with one application attempt started. + app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId()); + rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + Assert.assertEquals(RMAppState.ACCEPTED, app2.getState()); + Assert.assertEquals(1, app2.getAppAttempts().size()); + Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2 + .getCurrentAppAttempt().getAppAttemptState()); + + } + @Test public void testRMRestartFailedApp() throws Exception { conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); @@ -736,6 +909,8 @@ public class TestRMRestart { Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1), attemptState.getMasterContainer().getId()); + // Setting AMLivelinessMonitor interval to be 10 Secs. + conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000); // start new RM MockRM rm2 = new MockRM(conf, memStore); rm2.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index b9fc15f59f6..d391d950036 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions { } /** - * {@link RMAppAttemptState#RECOVERED} + * {@link RMAppAttemptState#LAUNCHED} */ private void testAppAttemptRecoveredState() { - assertEquals(RMAppAttemptState.RECOVERED, + assertEquals(RMAppAttemptState.LAUNCHED, applicationAttempt.getAppAttemptState()); }