From cbd0ddfebe9aee1bcf342c6e42c00b2094474f09 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Sun, 11 Sep 2011 17:28:08 +0000 Subject: [PATCH] Merge -r 1169483:1169484 from trunk to branch-0.23 to fix MAPREDUCE-2933. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1169485 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/rm/RMContainerAllocator.java | 13 +- .../v2/app/rm/RMContainerRequestor.java | 10 +- .../hadoop/yarn/api/records/AMResponse.java | 70 ++----- .../api/records/impl/pb/AMResponsePBImpl.java | 161 +++++++------- .../src/main/proto/yarn_protos.proto | 4 +- .../yarn/server/api/records/NodeStatus.java | 14 +- .../api/records/impl/pb/NodeStatusPBImpl.java | 196 +++++------------- .../proto/yarn_server_common_protos.proto | 11 +- .../nodemanager/NodeStatusUpdaterImpl.java | 25 +-- .../containermanager/container/Container.java | 2 - .../container/ContainerImpl.java | 18 -- .../nodemanager/TestNodeStatusUpdater.java | 38 +++- .../ApplicationMasterService.java | 4 +- .../ResourceTrackerService.java | 7 +- .../rmapp/attempt/RMAppAttempt.java | 5 +- .../rmapp/attempt/RMAppAttemptImpl.java | 19 +- .../RMAppAttemptContainerFinishedEvent.java | 12 +- .../rmcontainer/RMContainerFinishedEvent.java | 4 +- .../rmcontainer/RMContainerImpl.java | 28 +-- .../resourcemanager/rmnode/RMNodeImpl.java | 57 ++--- .../rmnode/RMNodeStatusEvent.java | 10 +- .../scheduler/SchedulerApp.java | 22 +- .../scheduler/SchedulerUtils.java | 74 +++++++ .../scheduler/capacity/CSQueue.java | 6 +- .../scheduler/capacity/CapacityScheduler.java | 85 +++++--- .../scheduler/capacity/LeafQueue.java | 14 +- .../scheduler/capacity/ParentQueue.java | 5 +- .../event/NodeUpdateSchedulerEvent.java | 17 +- .../scheduler/fifo/FifoScheduler.java | 76 ++++--- .../yarn/server/resourcemanager/MockNM.java | 16 +- .../server/resourcemanager/NodeManager.java | 38 +++- .../TestApplicationCleanup.java | 4 +- .../resourcemanager/TestFifoScheduler.java | 10 +- .../yarn/server/resourcemanager/TestRM.java | 8 +- .../scheduler/capacity/TestLeafQueue.java | 22 +- .../TestContainerTokenSecretManager.java | 4 +- 37 files changed, 540 insertions(+), 572 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 90f8ee10d27..d84b29f2c72 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -250,6 +250,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2711. Update TestBlockPlacementPolicyRaid for the new namesystem and block management APIs. (szetszwo) + MAPREDUCE-2933. Change allocate call to return ContainerStatus for + completed containers rather than Container. (acmurthy) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 7096b74bd02..ff232104bd4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; @@ -414,8 +415,8 @@ public class RMContainerAllocator extends RMContainerRequestor int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null AMResponse response = makeRemoteRequest(); int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - List newContainers = response.getNewContainerList(); - List finishedContainers = response.getFinishedContainerList(); + List newContainers = response.getAllocatedContainers(); + List finishedContainers = response.getCompletedContainersStatuses(); if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) { //something changed recalculateReduceSchedule = true; @@ -426,12 +427,12 @@ public class RMContainerAllocator extends RMContainerRequestor allocatedContainers.add(cont); LOG.debug("Received new Container :" + cont); } - for (Container cont : finishedContainers) { + for (ContainerStatus cont : finishedContainers) { LOG.info("Received completed container " + cont); - TaskAttemptId attemptID = assignedRequests.get(cont.getId()); + TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId()); if (attemptID == null) { LOG.error("Container complete event for unknown container id " - + cont.getId()); + + cont.getContainerId()); } else { assignedRequests.remove(attemptID); if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) { @@ -443,7 +444,7 @@ public class RMContainerAllocator extends RMContainerRequestor eventHandler.handle(new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_CONTAINER_COMPLETED)); // Send the diagnostics - String diagnostics = cont.getContainerStatus().getDiagnostics(); + String diagnostics = cont.getDiagnostics(); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, diagnostics)); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 2b60c41c3e2..b9f0c6ee45e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -36,7 +36,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMResponse; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -123,10 +122,11 @@ public abstract class RMContainerRequestor extends RMCommunicator { availableResources = response.getAvailableResources(); LOG.info("getResources() for " + applicationId + ":" + " ask=" - + ask.size() + " release= " + release.size() + " newContainers=" - + response.getNewContainerCount() + " finishedContainers=" - + response.getFinishedContainerCount() - + " resourcelimit=" + availableResources); + + ask.size() + " release= " + release.size() + + " newContainers=" + response.getAllocatedContainers().size() + + " finishedContainers=" + + response.getCompletedContainersStatuses().size() + + " resourcelimit=" + availableResources); ask.clear(); release.clear(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java index 54b00551334..d9414101560 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AMResponse.java @@ -86,32 +86,17 @@ public interface AMResponse { */ @Public @Stable - public List getNewContainerList(); + public List getAllocatedContainers(); - @Private - @Unstable - public Container getNewContainer(int index); + /** + * Set the list of newly allocated Container by the + * ResourceManager. + * @param containers list of newly allocated Container + */ + @Public + @Stable + public void setAllocatedContainers(List containers); - @Private - @Unstable - public int getNewContainerCount(); - - @Private - @Unstable - public void addAllNewContainers(List containers); - - @Private - @Unstable - public void addNewContainer(Container container); - - @Private - @Unstable - public void removeNewContainer(int index); - - @Private - @Unstable - public void clearNewContainers(); - /** * Get the available headroom for resources in the cluster for the * application. @@ -127,35 +112,18 @@ public interface AMResponse { public void setAvailableResources(Resource limit); /** - * Get the list of completed containers. - * @return the list of completed containers + * Get the list of completed containers' statuses. + * @return the list of completed containers' statuses */ @Public @Stable - public List getFinishedContainerList(); + public List getCompletedContainersStatuses(); - @Private - @Unstable - public Container getFinishedContainer(int index); - - @Private - @Unstable - public int getFinishedContainerCount(); - - - @Private - @Unstable - public void addAllFinishedContainers(List containers); - - @Private - @Unstable - public void addFinishedContainer(Container container); - - @Private - @Unstable - public void removeFinishedContainer(int index); - - @Private - @Unstable - public void clearFinishedContainers(); + /** + * Set the list of list of completed containers' statuses. + * @param containers list of completed containers' statuses + */ + @Public + @Stable + public void setCompletedContainersStatuses(List containers); } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java index 7ae3e865692..11fde9470bf 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/AMResponsePBImpl.java @@ -25,11 +25,13 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.AMResponse; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto; import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; @@ -41,8 +43,8 @@ public class AMResponsePBImpl extends ProtoBase implements AMRe Resource limit; - private List newContainersList = null; - private List finishedContainersList = null; + private List allocatedContainers = null; + private List completedContainersStatuses = null; // private boolean hasLocalContainerList = false; @@ -63,15 +65,17 @@ public class AMResponsePBImpl extends ProtoBase implements AMRe } private synchronized void mergeLocalToBuilder() { - if (this.newContainersList != null) { - builder.clearNewContainers(); - Iterable iterable = getProtoIterable(this.newContainersList); - builder.addAllNewContainers(iterable); + if (this.allocatedContainers != null) { + builder.clearAllocatedContainers(); + Iterable iterable = + getProtoIterable(this.allocatedContainers); + builder.addAllAllocatedContainers(iterable); } - if (this.finishedContainersList != null) { - builder.clearFinishedContainers(); - Iterable iterable = getProtoIterable(this.finishedContainersList); - builder.addAllFinishedContainers(iterable); + if (this.completedContainersStatuses != null) { + builder.clearCompletedContainerStatuses(); + Iterable iterable = + getContainerStatusProtoIterable(this.completedContainersStatuses); + builder.addAllCompletedContainerStatuses(iterable); } if (this.limit != null) { builder.setLimit(convertToProtoFormat(this.limit)); @@ -139,42 +143,31 @@ public class AMResponsePBImpl extends ProtoBase implements AMRe } @Override - public synchronized List getNewContainerList() { + public synchronized List getAllocatedContainers() { initLocalNewContainerList(); - return this.newContainersList; - } - - @Override - public synchronized Container getNewContainer(int index) { - initLocalNewContainerList(); - return this.newContainersList.get(index); - } - @Override - public synchronized int getNewContainerCount() { - initLocalNewContainerList(); - return this.newContainersList.size(); + return this.allocatedContainers; } //Once this is called. containerList will never be null - untill a getProto is called. private synchronized void initLocalNewContainerList() { - if (this.newContainersList != null) { + if (this.allocatedContainers != null) { return; } AMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getNewContainersList(); - newContainersList = new ArrayList(); + List list = p.getAllocatedContainersList(); + allocatedContainers = new ArrayList(); for (ContainerProto c : list) { - newContainersList.add(convertFromProtoFormat(c)); + allocatedContainers.add(convertFromProtoFormat(c)); } } @Override - public synchronized void addAllNewContainers(final List containers) { + public synchronized void setAllocatedContainers(final List containers) { if (containers == null) return; initLocalNewContainerList(); - newContainersList.addAll(containers); + allocatedContainers.addAll(containers); } private synchronized Iterable getProtoIterable( @@ -207,86 +200,71 @@ public class AMResponsePBImpl extends ProtoBase implements AMRe } }; } - - @Override - public synchronized void addNewContainer(Container containers) { - initLocalNewContainerList(); - if (containers == null) - return; - this.newContainersList.add(containers); - } - - @Override - public synchronized void removeNewContainer(int index) { - initLocalNewContainerList(); - this.newContainersList.remove(index); - } - @Override - public synchronized void clearNewContainers() { - initLocalNewContainerList(); - this.newContainersList.clear(); + + private synchronized Iterable + getContainerStatusProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newContainersList.iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; } //// Finished containers @Override - public synchronized List getFinishedContainerList() { + public synchronized List getCompletedContainersStatuses() { initLocalFinishedContainerList(); - return this.finishedContainersList; - } - - @Override - public synchronized Container getFinishedContainer(int index) { - initLocalFinishedContainerList(); - return this.finishedContainersList.get(index); - } - @Override - public synchronized int getFinishedContainerCount() { - initLocalFinishedContainerList(); - return this.finishedContainersList.size(); + return this.completedContainersStatuses; } //Once this is called. containerList will never be null - untill a getProto is called. private synchronized void initLocalFinishedContainerList() { - if (this.finishedContainersList != null) { + if (this.completedContainersStatuses != null) { return; } AMResponseProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getFinishedContainersList(); - finishedContainersList = new ArrayList(); + List list = p.getCompletedContainerStatusesList(); + completedContainersStatuses = new ArrayList(); - for (ContainerProto c : list) { - finishedContainersList.add(convertFromProtoFormat(c)); + for (ContainerStatusProto c : list) { + completedContainersStatuses.add(convertFromProtoFormat(c)); } } @Override - public synchronized void addAllFinishedContainers(final List containers) { + public synchronized void setCompletedContainersStatuses( + final List containers) { if (containers == null) return; initLocalFinishedContainerList(); - finishedContainersList.addAll(containers); + completedContainersStatuses.addAll(containers); } - @Override - public synchronized void addFinishedContainer(Container containers) { - initLocalFinishedContainerList(); - if (containers == null) - return; - this.finishedContainersList.add(containers); - } - - @Override - public synchronized void removeFinishedContainer(int index) { - initLocalFinishedContainerList(); - this.finishedContainersList.remove(index); - } - @Override - public synchronized void clearFinishedContainers() { - initLocalFinishedContainerList(); - this.finishedContainersList.clear(); - } - - private synchronized ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + private synchronized ContainerPBImpl convertFromProtoFormat( + ContainerProto p) { return new ContainerPBImpl(p); } @@ -294,6 +272,15 @@ public class AMResponsePBImpl extends ProtoBase implements AMRe return ((ContainerPBImpl)t).getProto(); } + private synchronized ContainerStatusPBImpl convertFromProtoFormat( + ContainerStatusProto p) { + return new ContainerStatusPBImpl(p); + } + + private synchronized ContainerStatusProto convertToProtoFormat(ContainerStatus t) { + return ((ContainerStatusPBImpl)t).getProto(); + } + private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) { return new ResourcePBImpl(p); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index b91449c3df8..34d8396417c 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -177,8 +177,8 @@ message ResourceRequestProto { message AMResponseProto { optional bool reboot = 1; optional int32 response_id = 2; - repeated ContainerProto new_containers = 3; - repeated ContainerProto finished_containers = 4; + repeated ContainerProto allocated_containers = 3; + repeated ContainerStatusProto completed_container_statuses = 4; optional ResourceProto limit = 5; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 7965b5cc686..7822789eb07 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -18,10 +18,8 @@ package org.apache.hadoop.yarn.server.api.records; import java.util.List; -import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -31,17 +29,13 @@ public interface NodeStatus { public abstract NodeId getNodeId(); public abstract int getResponseId(); - public abstract Map> getAllContainers(); - public abstract List getContainers(ApplicationId key); + public abstract List getContainersStatuses(); + public abstract void setContainersStatuses( + List containersStatuses); NodeHealthStatus getNodeHealthStatus(); void setNodeHealthStatus(NodeHealthStatus healthStatus); public abstract void setNodeId(NodeId nodeId); public abstract void setResponseId(int responseId); - - public abstract void addAllContainers(Map> containers); - public abstract void setContainers(ApplicationId key, List containers); - public abstract void removeContainers(ApplicationId key); - public abstract void clearContainers(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 2bb178f0d0d..03d5e8cdbd5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -20,27 +20,19 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb; import java.util.ArrayList; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ProtoBase; -import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; -import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; -import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; -import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationIdContainerListMapProto; -import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerListProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder; import org.apache.hadoop.yarn.server.api.records.NodeStatus; @@ -51,7 +43,7 @@ public class NodeStatusPBImpl extends ProtoBase implements Node boolean viaProto = false; private NodeId nodeId = null; - private Map> containers = null; + private List containers = null; private NodeHealthStatus nodeHealthStatus = null; public NodeStatusPBImpl() { @@ -99,7 +91,40 @@ public class NodeStatusPBImpl extends ProtoBase implements Node viaProto = false; } + private void addContainersToProto() { + maybeInitBuilder(); + builder.clearContainersStatuses(); + if (containers == null) + return; + Iterable iterable = new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + Iterator iter = containers.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerStatusProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + + } + }; + + } + }; + builder.addAllContainersStatuses(iterable); + } + @Override public int getResponseId() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -133,24 +158,17 @@ public class NodeStatusPBImpl extends ProtoBase implements Node } @Override - public Map> getAllContainers() { + public List getContainersStatuses() { initContainers(); - HashMap> returnMap = new HashMap>( - this.containers.size()); - for (Entry> entry : this.containers.entrySet()) { - returnMap.put(convertFromProtoFormat(entry.getKey()), entry.getValue()); - } - return returnMap; + return this.containers; } @Override - public List getContainers(ApplicationId applicationId) { - initContainers(); - ApplicationIdProto applicationIdProto = convertToProtoFormat(applicationId); - if (this.containers.get(applicationIdProto) == null) { - this.containers.put(applicationIdProto, new ArrayList()); + public void setContainersStatuses(List containers) { + if (containers == null) { + builder.clearContainersStatuses(); } - return this.containers.get(applicationIdProto); + this.containers = containers; } private void initContainers() { @@ -158,59 +176,15 @@ public class NodeStatusPBImpl extends ProtoBase implements Node return; } NodeStatusProtoOrBuilder p = viaProto ? proto : builder; - List list = p.getContainersList(); - this.containers = new HashMap>(); + List list = p.getContainersStatusesList(); + this.containers = new ArrayList(); - for (ApplicationIdContainerListMapProto c : list) { - this.containers.put(c.getApplicationId(), convertFromProtoFormat(c.getValue())); + for (ContainerStatusProto c : list) { + this.containers.add(convertFromProtoFormat(c)); } } - @Override - public void addAllContainers(final Map> containers) { - if (containers == null) - return; - initContainers(); - for (Entry> entry : containers.entrySet()) { - this.containers.put(convertToProtoFormat(entry.getKey()), entry.getValue()); - } - } - - private void addContainersToProto() { - maybeInitBuilder(); - builder.clearContainers(); - viaProto = false; - Iterable iterable = new Iterable() { - - @Override - public Iterator iterator() { - return new Iterator() { - - Iterator keyIter = containers.keySet().iterator(); - @Override - public boolean hasNext() { - return keyIter.hasNext(); - } - - @Override - public ApplicationIdContainerListMapProto next() { - ApplicationIdProto applicationIdProto = keyIter.next(); - return ApplicationIdContainerListMapProto.newBuilder().setApplicationId(applicationIdProto).setValue(convertToProtoFormat(containers.get(applicationIdProto))).build(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - }; - } - - }; - builder.addAllContainers(iterable); - } - @Override public NodeHealthStatus getNodeHealthStatus() { NodeStatusProtoOrBuilder p = viaProto ? proto : builder; @@ -233,66 +207,6 @@ public class NodeStatusPBImpl extends ProtoBase implements Node this.nodeHealthStatus = healthStatus; } - /* - * - * @Override - public String getApplicationName() { - ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasApplicationName()) { - return null; - } - return (p.getApplicationName()); - } - - @Override - public void setApplicationName(String applicationName) { - maybeInitBuilder(); - if (applicationName == null) { - builder.clearApplicationName(); - return; - } - builder.setApplicationName((applicationName)); - } - */ - - private ContainerListProto convertToProtoFormat(List src) { - ContainerListProto.Builder ret = ContainerListProto.newBuilder(); - for (Container c : src) { - ret.addContainer(((ContainerPBImpl)c).getProto()); - } - return ret.build(); - } - - private List convertFromProtoFormat(ContainerListProto src) { - List ret = new ArrayList(); - for (ContainerProto c : src.getContainerList()) { - ret.add(convertFromProtoFormat(c)); - } - return ret; - } - - private Container convertFromProtoFormat(ContainerProto src) { - return new ContainerPBImpl(src); - } - - @Override - public void setContainers(ApplicationId applicationId, List containers) { - initContainers(); - this.containers.put(convertToProtoFormat(applicationId), containers); - } - - @Override - public void removeContainers(ApplicationId applicationId) { - initContainers(); - this.containers.remove(convertToProtoFormat(applicationId)); - } - - @Override - public void clearContainers() { - initContainers(); - this.containers.clear(); - } - private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -301,14 +215,6 @@ public class NodeStatusPBImpl extends ProtoBase implements Node return new NodeIdPBImpl(proto); } - private ApplicationIdProto convertToProtoFormat(ApplicationId applicationId) { - return ((ApplicationIdPBImpl)applicationId).getProto(); - } - - private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) { - return new ApplicationIdPBImpl(proto); - } - private NodeHealthStatusProto convertToProtoFormat( NodeHealthStatus healthStatus) { return ((NodeHealthStatusPBImpl) healthStatus).getProto(); @@ -317,4 +223,12 @@ public class NodeStatusPBImpl extends ProtoBase implements Node private NodeHealthStatus convertFromProtoFormat(NodeHealthStatusProto proto) { return new NodeHealthStatusPBImpl(proto); } + + private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) { + return new ContainerStatusPBImpl(c); + } + + private ContainerStatusProto convertToProtoFormat(ContainerStatus c) { + return ((ContainerStatusPBImpl)c).getProto(); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 8d0d2383af7..5198c5743f2 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -26,7 +26,7 @@ import "yarn_protos.proto"; message NodeStatusProto { optional NodeIdProto node_id = 1; optional int32 response_id = 2; - repeated ApplicationIdContainerListMapProto containers = 3; + repeated ContainerStatusProto containersStatuses = 3; optional NodeHealthStatusProto nodeHealthStatus = 4; } @@ -41,12 +41,3 @@ message HeartbeatResponseProto { repeated ApplicationIdProto applications_to_cleanup = 4; } -message ContainerListProto { - repeated ContainerProto container = 1; -} - -message ApplicationIdContainerListMapProto { - optional ApplicationIdProto application_id = 1; - optional ContainerListProto value = 2; -} - diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 9b4265d75ec..40e0ee87a6d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -30,13 +30,13 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.NodeHealthCheckerService; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityInfo; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -73,7 +73,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private String rmAddress; private Resource totalResource; private String containerManagerBindAddress; - private String nodeHttpAddress; private String hostName; private int containerManagerPort; private int httpPort; @@ -127,7 +126,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.httpPort = httpBindAddress.getPort(); this.containerManagerBindAddress = this.hostName + ":" + this.containerManagerPort; - this.nodeHttpAddress = this.hostName + ":" + this.httpPort; LOG.info("Configured ContainerManager Address is " + this.containerManagerBindAddress); // Registration has to be in start so that ContainerManager can get the @@ -195,35 +193,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements nodeStatus.setNodeId(this.nodeId); int numActiveContainers = 0; + List containersStatuses = new ArrayList(); for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); ContainerId containerId = e.getKey(); Container container = e.getValue(); - List applicationContainers = nodeStatus - .getContainers(container.getContainerID().getAppId()); - if (applicationContainers == null) { - applicationContainers = new ArrayList(); - nodeStatus.setContainers(container.getContainerID().getAppId(), - applicationContainers); - } - // Clone the container to send it to the RM - org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer(); - c.setNodeId(this.nodeId); - c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime. - applicationContainers.add(c); + org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = + container.cloneAndGetContainerStatus(); + containersStatuses.add(containerStatus); ++numActiveContainers; - LOG.info("Sending out status for container: " + c); + LOG.info("Sending out status for container: " + containerStatus); - if (c.getState() == ContainerState.COMPLETE) { + if (containerStatus.getState() == ContainerState.COMPLETE) { // Remove i.remove(); LOG.info("Removed completed container " + containerId); } } + nodeStatus.setContainersStatuses(containersStatuses); LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers + " containers"); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java index 6bd29e851bf..e5ba3f29936 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java @@ -40,8 +40,6 @@ public interface Container extends EventHandler { Map getLocalizedResources(); - org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer(); - ContainerStatus cloneAndGetContainerStatus(); String toString(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java index 46d5a526177..5bdb96d6e28 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java @@ -326,24 +326,6 @@ public class ContainerImpl implements Container { } } - @Override - public - org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() { - this.readLock.lock(); - try { - org.apache.hadoop.yarn.api.records.Container c = - recordFactory.newRecordInstance( - org.apache.hadoop.yarn.api.records.Container.class); - c.setId(this.launchContext.getContainerId()); - c.setResource(this.launchContext.getResource()); - c.setState(getCurrentState()); - c.setContainerStatus(cloneAndGetContainerStatus()); - return c; - } finally { - this.readLock.unlock(); - } - } - @Override public ContainerLaunchContext getLaunchContext() { this.readLock.lock(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 740d533f3e8..a3042e611ff 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -32,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -98,13 +103,30 @@ public class TestNodeStatusUpdater { ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class); ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class); + private Map> getAppToContainerStatusMap( + List containers) { + Map> map = + new HashMap>(); + for (ContainerStatus cs : containers) { + ApplicationId applicationId = cs.getContainerId().getAppId(); + List appContainers = map.get(applicationId); + if (appContainers == null) { + appContainers = new ArrayList(); + map.put(applicationId, appContainers); + } + appContainers.add(cs); + } + return map; + } @Override public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { NodeStatus nodeStatus = request.getNodeStatus(); LOG.info("Got heartbeat number " + heartBeatID); nodeStatus.setResponseId(heartBeatID++); + Map> appToContainers = + getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); if (heartBeatID == 1) { - Assert.assertEquals(0, nodeStatus.getAllContainers().size()); + Assert.assertEquals(0, nodeStatus.getContainersStatuses().size()); // Give a container to the NM. applicationID.setId(heartBeatID); @@ -121,11 +143,9 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 2) { // Checks on the RM end Assert.assertEquals("Number of applications should only be one!", 1, - nodeStatus.getAllContainers().size()); + nodeStatus.getContainersStatuses().size()); Assert.assertEquals("Number of container for the app should be one!", - 1, nodeStatus.getContainers(applicationID).size()); - Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0) - .getResource().getMemory()); + 1, appToContainers.get(applicationID).size()); // Checks on the NM end ConcurrentMap activeContainers = @@ -147,13 +167,9 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 3) { // Checks on the RM end Assert.assertEquals("Number of applications should only be one!", 1, - nodeStatus.getAllContainers().size()); + appToContainers.size()); Assert.assertEquals("Number of container for the app should be two!", - 2, nodeStatus.getContainers(applicationID).size()); - Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0) - .getResource().getMemory()); - Assert.assertEquals(3, nodeStatus.getContainers(applicationID).get(1) - .getResource().getMemory()); + 2, appToContainers.get(applicationID).size()); // Checks on the NM end ConcurrentMap activeContainers = diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 7d3ff14ceac..436ed23d6ea 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -232,8 +232,8 @@ public class ApplicationMasterService extends AbstractService implements RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AMResponse response = recordFactory.newRecordInstance(AMResponse.class); - response.addAllNewContainers(allocation.getContainers()); - response.addAllFinishedContainers(appAttempt + response.setAllocatedContainers(allocation.getContainers()); + response.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); response.setResponseId(lastResponse.getResponseId() + 1); response.setAvailableResources(allocation.getResourceLimit()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index f88fe76bb5b..ba543813ac5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import javax.crypto.SecretKey; @@ -31,7 +27,6 @@ import org.apache.avro.ipc.Server; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.SecurityInfo; @@ -252,7 +247,7 @@ public class ResourceTrackerService extends AbstractService implements // 4. Send status to RMNode, saving the latest response. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), - remoteNodeStatus.getAllContainers(), latestResponse)); + remoteNodeStatus.getContainersStatuses(), latestResponse)); nodeHeartBeatResponse.setHeartbeatResponse(latestResponse); return nodeHeartBeatResponse; diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java index 861de522cb1..98025fb6ca8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.event.EventHandler; @@ -47,9 +48,9 @@ public interface RMAppAttempt extends EventHandler{ Set getRanNodes(); - List pullJustFinishedContainers(); + List pullJustFinishedContainers(); - List getJustFinishedContainers(); + List getJustFinishedContainers(); Container getMasterContainer(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index a0b5b22fe72..6daff1d88e7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -97,8 +98,8 @@ public class RMAppAttemptImpl implements RMAppAttempt { //nodes on while this attempt's containers ran private final Set ranNodes = new HashSet(); - private final List justFinishedContainers = - new ArrayList(); + private final List justFinishedContainers = + new ArrayList(); private Container masterContainer; private float progress = 0; @@ -333,7 +334,7 @@ public class RMAppAttemptImpl implements RMAppAttempt { } @Override - public List getJustFinishedContainers() { + public List getJustFinishedContainers() { this.readLock.lock(); try { return this.justFinishedContainers; @@ -343,11 +344,11 @@ public class RMAppAttemptImpl implements RMAppAttempt { } @Override - public List pullJustFinishedContainers() { + public List pullJustFinishedContainers() { this.writeLock.lock(); try { - List returnList = new ArrayList( + List returnList = new ArrayList( this.justFinishedContainers.size()); returnList.addAll(this.justFinishedContainers); this.justFinishedContainers.clear(); @@ -705,11 +706,13 @@ public class RMAppAttemptImpl implements RMAppAttempt { RMAppAttemptContainerFinishedEvent containerFinishedEvent = (RMAppAttemptContainerFinishedEvent) event; - Container container = containerFinishedEvent.getContainer(); + ContainerStatus containerStatus = + containerFinishedEvent.getContainerStatus(); // Is this container the AmContainer? If the finished container is same as // the AMContainer, AppAttempt fails - if (appAttempt.masterContainer.getId().equals(container.getId())) { + if (appAttempt.masterContainer.getId().equals( + containerStatus.getContainerId())) { new FinalTransition(RMAppAttemptState.FAILED).transition( appAttempt, containerFinishedEvent); return RMAppAttemptState.FAILED; @@ -718,7 +721,7 @@ public class RMAppAttemptImpl implements RMAppAttempt { // Normal container. // Put it in completedcontainers list - appAttempt.justFinishedContainers.add(container); + appAttempt.justFinishedContainers.add(containerStatus); return RMAppAttemptState.RUNNING; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java index 750ec07173d..3660597d3fc 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerFinishedEvent.java @@ -19,22 +19,22 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent { - private final Container container; + private final ContainerStatus containerStatus; public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId, - Container container) { + ContainerStatus containerStatus) { super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED); - this.container = container; + this.containerStatus = containerStatus; } - public Container getContainer() { - return this.container; + public ContainerStatus getContainerStatus() { + return this.containerStatus; } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java index 740287de4fb..d760e7dc3a7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerFinishedEvent.java @@ -26,8 +26,8 @@ public class RMContainerFinishedEvent extends RMContainerEvent { private final ContainerStatus remoteContainerStatus; public RMContainerFinishedEvent(ContainerId containerId, - ContainerStatus containerStatus) { - super(containerId, RMContainerEventType.FINISHED); + ContainerStatus containerStatus, RMContainerEventType event) { + super(containerId, event); this.remoteContainerStatus = containerStatus; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java index 07bc1cdeec0..ffdd23c4386 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java @@ -92,7 +92,7 @@ public class RMContainerImpl implements RMContainer { // Transitions from RUNNING state .addTransition(RMContainerState.RUNNING, RMContainerState.COMPLETED, - RMContainerEventType.FINISHED, new ContainerCompletedTransition()) + RMContainerEventType.FINISHED, new FinishedTransition()) .addTransition(RMContainerState.RUNNING, RMContainerState.KILLED, RMContainerEventType.KILL, new KillTransition()) @@ -273,10 +273,16 @@ public class RMContainerImpl implements RMContainer { @Override public void transition(RMContainerImpl container, RMContainerEvent event) { + RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; + + // Update container-status for diagnostics. Today we completely + // replace it on finish. We may just need to update diagnostics. + container.container.setContainerStatus(finishedEvent + .getRemoteContainerStatus()); // Inform AppAttempt container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent( - container.appAttemptId, container.container)); + container.appAttemptId, container.container.getContainerStatus())); } } @@ -312,22 +318,4 @@ public class RMContainerImpl implements RMContainer { } } - private static final class ContainerCompletedTransition extends - FinishedTransition { - - @Override - public void transition(RMContainerImpl container, RMContainerEvent event) { - - RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event; - - // Update container-status for diagnostics. Today we completely - // replace it on finish. We may just need to update diagnostics. - // ^TODO - container.container.setContainerStatus(finishedEvent - .getRemoteContainerStatus()); - - // Inform appAttempt - super.transition(container, event); - } - } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index ceb28434f4f..dd8d7f840f7 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -23,7 +23,6 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -36,9 +35,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.net.Node; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -87,8 +86,8 @@ public class RMNodeImpl implements RMNode, EventHandler { .newRecordInstance(NodeHealthStatus.class); /* set of containers that have just launched */ - private final Map justLaunchedContainers = - new HashMap(); + private final Map justLaunchedContainers = + new HashMap(); /* set of containers that need to be cleaned */ @@ -355,43 +354,29 @@ public class RMNodeImpl implements RMNode, EventHandler { // Filter the map to only obtain just launched containers and finished // containers. - Map> remoteAppContainersMap = statusEvent - .getContainersCollection(); - Map> containersMapForScheduler = new HashMap>( - remoteAppContainersMap.size()); - for (Entry> entrySet : remoteAppContainersMap - .entrySet()) { - - ApplicationId appId = entrySet.getKey(); - List remoteContainerList = entrySet.getValue(); - - if (!containersMapForScheduler.containsKey(appId)) { - containersMapForScheduler.put(appId, new ArrayList( - remoteContainerList.size())); - } - List entryForThisApp = containersMapForScheduler - .get(appId); - - for (Container remoteContainer : remoteContainerList) { - - // Process running containers - ContainerId containerId = remoteContainer.getId(); - if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!rmNode.justLaunchedContainers.containsKey(containerId)) { - // Just launched container. RM knows about it the first time. - rmNode.justLaunchedContainers.put(containerId, remoteContainer); - entryForThisApp.add(remoteContainer); - } - } else { - // A finished container - rmNode.justLaunchedContainers.remove(containerId); - entryForThisApp.add(remoteContainer); + List newlyLaunchedContainers = + new ArrayList(); + List completedContainers = + new ArrayList(); + for (ContainerStatus remoteContainer : statusEvent.getContainers()) { + // Process running containers + ContainerId containerId = remoteContainer.getContainerId(); + if (remoteContainer.getState() == ContainerState.RUNNING) { + if (!rmNode.justLaunchedContainers.containsKey(containerId)) { + // Just launched container. RM knows about it the first time. + rmNode.justLaunchedContainers.put(containerId, remoteContainer); + newlyLaunchedContainers.add(remoteContainer); } + } else { + // A finished container + rmNode.justLaunchedContainers.remove(containerId); + completedContainers.add(remoteContainer); } } rmNode.context.getDispatcher().getEventHandler().handle( - new NodeUpdateSchedulerEvent(rmNode, containersMapForScheduler)); + new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, + completedContainers)); return RMNodeState.RUNNING; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 08f2b689719..e4a29301688 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -19,10 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode; import java.util.List; -import java.util.Map; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; @@ -30,11 +28,11 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; public class RMNodeStatusEvent extends RMNodeEvent { private final NodeHealthStatus nodeHealthStatus; - private Map> containersCollection; + private List containersCollection; private final HeartbeatResponse latestResponse; public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, - Map> collection, + List collection, HeartbeatResponse latestResponse) { super(nodeId, RMNodeEventType.STATUS_UPDATE); this.nodeHealthStatus = nodeHealthStatus; @@ -46,7 +44,7 @@ public class RMNodeStatusEvent extends RMNodeEvent { return this.nodeHealthStatus; } - public Map> getContainersCollection() { + public List getContainers() { return this.containersCollection; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java index 262da738f07..739b8b6a6cb 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApp.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -161,26 +162,21 @@ public class SchedulerApp { RMContainerEventType.LAUNCHED)); } - public synchronized void killContainers( - SchedulerApp application) { - } - synchronized public void containerCompleted(RMContainer rmContainer, - RMContainerEventType event) { + ContainerStatus containerStatus, RMContainerEventType event) { Container container = rmContainer.getContainer(); ContainerId containerId = container.getId(); // Inform the container - if (event.equals(RMContainerEventType.FINISHED)) { - // Have to send diagnostics for finished containers. - rmContainer.handle(new RMContainerFinishedEvent(containerId, - container.getContainerStatus())); - } else { - rmContainer.handle(new RMContainerEvent(containerId, event)); - } + rmContainer.handle( + new RMContainerFinishedEvent( + containerId, + containerStatus, + event) + ); LOG.info("Completed container: " + rmContainer.getContainerId() + - " in state: " + rmContainer.getState()); + " in state: " + rmContainer.getState() + " event:" + event); // Remove from the list of containers liveContainers.remove(rmContainer.getContainerId()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java new file mode 100644 index 00000000000..5b588d1ab6a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; + +/** + * Utilities shared by schedulers. + */ +@Private +@Unstable +public class SchedulerUtils { + + private static final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public static final String RELEASED_CONTAINER = + "Container released by application"; + + public static final String LOST_CONTAINER = + "Container released on a *lost* node"; + + public static final String COMPLETED_APPLICATION = + "Container of a completed application"; + + public static final String EXPIRED_CONTAINER = + "Container expired since it unused"; + + public static final String UNRESERVED_CONTAINER = + "Container reservation no longer required."; + + /** + * Utility to create a {@link ContainerStatus} during exceptional + * circumstances. + * + * @param containerId {@link ContainerId} of returned/released/lost container. + * @param diagnostics diagnostic message + * @return ContainerStatus for an returned/released/lost + * container + */ + public static ContainerStatus createAbnormalContainerStatus( + ContainerId containerId, String diagnostics) { + ContainerStatus containerStatus = + recordFactory.newRecordInstance(ContainerStatus.class); + containerStatus.setContainerId(containerId); + containerStatus.setDiagnostics(diagnostics); + containerStatus.setExitStatus("ABORTED"); + containerStatus.setState(ContainerState.COMPLETE); + return containerStatus; + } + + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index f8d753fa107..f2c9533a228 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; @@ -165,11 +166,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue { * @param node node on which the container completed * @param container completed container, * null if it was just a reservation + * @param containerStatus ContainerStatus for the completed + * container * @param event event to be sent to the container */ public void completedContainer(Resource clusterResource, SchedulerApp application, SchedulerNode node, - RMContainer container, RMContainerEventType event); + RMContainer container, ContainerStatus containerStatus, + RMContainerEventType event); /** * Get the number of applications in the queue. diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index f4fbc150d60..8f44f1031af 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -36,10 +36,9 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.Lock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -59,11 +58,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -127,6 +126,8 @@ implements ResourceScheduler, CapacitySchedulerContext { private boolean initialized = false; + public CapacityScheduler() {} + public CSQueue getRootQueue() { return root; } @@ -392,12 +393,20 @@ implements ResourceScheduler, CapacitySchedulerContext { // Release all the running containers for (RMContainer rmContainer : application.getLiveContainers()) { - completedContainer(rmContainer, RMContainerEventType.KILL); + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); } // Release all reserved containers for (RMContainer rmContainer : application.getAllReservedContainers()) { - completedContainer(rmContainer, RMContainerEventType.KILL); + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + rmContainer.getContainerId(), + "Application Complete"), + RMContainerEventType.KILL); } // Clean up pending requests, metrics etc. @@ -445,7 +454,11 @@ implements ResourceScheduler, CapacitySchedulerContext { "Trying to release container not owned by app or with invalid id", application.getApplicationId(), releasedContainerId); } - completedContainer(rmContainer, RMContainerEventType.RELEASED); + completedContainer(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + releasedContainerId, + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); } synchronized (application) { @@ -521,22 +534,23 @@ implements ResourceScheduler, CapacitySchedulerContext { } private synchronized void nodeUpdate(RMNode nm, - Map> containers ) { + List newlyLaunchedContainers, + List completedContainers) { LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource); SchedulerNode node = getNode(nm.getNodeID()); - // Processing the current containers running/finished on node - for (List appContainers : containers.values()) { - for (Container container : appContainers) { - if (container.getState() == ContainerState.RUNNING) { - containerLaunchedOnNode(container, node); - } else { // has to be 'COMPLETE' - LOG.info("DEBUG --- Container FINISHED: " + container.getId()); - completedContainer(getRMContainer(container.getId()), - RMContainerEventType.FINISHED); - } - } + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.info("DEBUG --- Container FINISHED: " + containerId); + completedContainer(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); } // Now node data structures are upto date and ready for scheduling. @@ -571,18 +585,18 @@ implements ResourceScheduler, CapacitySchedulerContext { } - private void containerLaunchedOnNode(Container container, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId(); + ApplicationAttemptId applicationAttemptId = containerId.getAppAttemptId(); SchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + - " launched container " + container.getId() + + " launched container " + containerId + " on node: " + node); return; } - application.containerLaunchedOnNode(container.getId()); + application.containerLaunchedOnNode(containerId); } @Override @@ -604,7 +618,8 @@ implements ResourceScheduler, CapacitySchedulerContext { { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode(), - nodeUpdatedEvent.getContainers()); + nodeUpdatedEvent.getNewlyLaunchedContainers(), + nodeUpdatedEvent.getCompletedContainers()); } break; case APP_ADDED: @@ -625,7 +640,11 @@ implements ResourceScheduler, CapacitySchedulerContext { { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; - completedContainer(getRMContainer(containerExpiredEvent.getContainerId()), + ContainerId containerId = containerExpiredEvent.getContainerId(); + completedContainer(getRMContainer(containerId), + SchedulerUtils.createAbnormalContainerStatus( + containerId, + SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; @@ -652,13 +671,21 @@ implements ResourceScheduler, CapacitySchedulerContext { // Remove running containers List runningContainers = node.getRunningContainers(); for (RMContainer container : runningContainers) { - completedContainer(container, RMContainerEventType.KILL); + completedContainer(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); } // Remove reservations, if any RMContainer reservedContainer = node.getReservedContainer(); if (reservedContainer != null) { - completedContainer(reservedContainer, RMContainerEventType.KILL); + completedContainer(reservedContainer, + SchedulerUtils.createAbnormalContainerStatus( + reservedContainer.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); } this.nodes.remove(nodeInfo.getNodeID()); @@ -667,8 +694,8 @@ implements ResourceScheduler, CapacitySchedulerContext { } @Lock(CapacityScheduler.class) - private synchronized void completedContainer(RMContainer rmContainer, - RMContainerEventType event) { + private synchronized void completedContainer(RMContainer rmContainer, + ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); return; @@ -692,7 +719,7 @@ implements ResourceScheduler, CapacitySchedulerContext { // Inform the queue LeafQueue queue = (LeafQueue)application.getQueue(); queue.completedContainer(clusterResource, application, node, - rmContainer, event); + rmContainer, containerStatus, event); LOG.info("Application " + applicationAttemptId + " released container " + container.getId() + diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 091e67e923f..2038e2d871e 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -39,6 +39,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -762,7 +764,11 @@ public class LeafQueue implements CSQueue { // Release Container container = rmContainer.getContainer(); completedContainer(clusterResource, application, node, - rmContainer, RMContainerEventType.RELEASED); + rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + container.getId(), + SchedulerUtils.UNRESERVED_CONTAINER), + RMContainerEventType.RELEASED); return container.getResource(); } @@ -1175,7 +1181,7 @@ public class LeafQueue implements CSQueue { @Override public void completedContainer(Resource clusterResource, SchedulerApp application, SchedulerNode node, RMContainer rmContainer, - RMContainerEventType event) { + ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! synchronized (this) { @@ -1190,7 +1196,7 @@ public class LeafQueue implements CSQueue { application.unreserve(node, rmContainer.getReservedPriority()); node.unreserveResource(application); } else { - application.containerCompleted(rmContainer, event); + application.containerCompleted(rmContainer, containerStatus, event); node.releaseContainer(container); } @@ -1210,7 +1216,7 @@ public class LeafQueue implements CSQueue { // Inform the parent queue parent.completedContainer(clusterResource, application, - node, rmContainer, event); + node, rmContainer, null, event); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 3080c0ba8bf..6aa282798c1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,6 +37,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; @@ -608,7 +609,7 @@ public class ParentQueue implements CSQueue { @Override public void completedContainer(Resource clusterResource, SchedulerApp application, SchedulerNode node, - RMContainer rmContainer, RMContainerEventType event) { + RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) { if (application != null) { // Careful! Locking order is important! // Book keeping @@ -626,7 +627,7 @@ public class ParentQueue implements CSQueue { // Inform the parent if (parent != null) { parent.completedContainer(clusterResource, application, - node, rmContainer, event); + node, rmContainer, null, event); } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java index d0f07f17e52..9f3bc1cce7a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeUpdateSchedulerEvent.java @@ -23,26 +23,33 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; public class NodeUpdateSchedulerEvent extends SchedulerEvent { private final RMNode rmNode; - private final Map> containers; + private final List newlyLaunchedContainers; + private final List completedContainersStatuses; public NodeUpdateSchedulerEvent(RMNode rmNode, - Map> containers) { + List newlyLaunchedContainers, + List completedContainers) { super(SchedulerEventType.NODE_UPDATE); this.rmNode = rmNode; - this.containers = containers; + this.newlyLaunchedContainers = newlyLaunchedContainers; + this.completedContainersStatuses = completedContainers; } public RMNode getRMNode() { return rmNode; } - public Map> getContainers() { - return containers; + public List getNewlyLaunchedContainers() { + return newlyLaunchedContainers; } + public List getCompletedContainers() { + return completedContainersStatuses; + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 81f23633af8..b99b9a67134 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -39,10 +39,9 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.Lock; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerToken; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -73,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; @@ -90,7 +90,7 @@ public class FifoScheduler implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(FifoScheduler.class); - private final RecordFactory recordFactory = + private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); Configuration conf; @@ -234,7 +234,11 @@ public class FifoScheduler implements ResourceScheduler { "Trying to release container not owned by app or with invalid id", application.getApplicationId(), releasedContainer); } - containerCompleted(rmContainer, RMContainerEventType.RELEASED); + containerCompleted(rmContainer, + SchedulerUtils.createAbnormalContainerStatus( + releasedContainer, + SchedulerUtils.RELEASED_CONTAINER), + RMContainerEventType.RELEASED); } if (!ask.isEmpty()) { @@ -312,7 +316,11 @@ public class FifoScheduler implements ResourceScheduler { // Kill all 'live' containers for (RMContainer container : application.getLiveContainers()) { - containerCompleted(container, RMContainerEventType.KILL); + containerCompleted(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.COMPLETED_APPLICATION), + RMContainerEventType.KILL); } // Clean up pending requests, metrics etc. @@ -542,25 +550,22 @@ public class FifoScheduler implements ResourceScheduler { return assignedContainers; } - private synchronized void nodeUpdate(RMNode rmNode, - Map> remoteContainers) { + private synchronized void nodeUpdate(RMNode rmNode, + List newlyLaunchedContainers, + List completedContainers) { SchedulerNode node = getNode(rmNode.getNodeID()); - for (List appContainers : remoteContainers.values()) { - for (Container container : appContainers) { - /* make sure the scheduler hasnt already removed the applications */ - if (getApplication(container.getId().getAppAttemptId()) != null) { - if (container.getState() == ContainerState.RUNNING) { - containerLaunchedOnNode(container, node); - } else { // has to COMPLETE - containerCompleted(getRMContainer(container.getId()), - RMContainerEventType.FINISHED); - } - } - else { - LOG.warn("Scheduler not tracking application " + container.getId().getAppAttemptId()); - } - } + // Processing the newly launched containers + for (ContainerStatus launchedContainer : newlyLaunchedContainers) { + containerLaunchedOnNode(launchedContainer.getContainerId(), node); + } + + // Process completed containers + for (ContainerStatus completedContainer : completedContainers) { + ContainerId containerId = completedContainer.getContainerId(); + LOG.info("DEBUG --- Container FINISHED: " + containerId); + containerCompleted(getRMContainer(containerId), + completedContainer, RMContainerEventType.FINISHED); } if (Resources.greaterThanOrEqual(node.getAvailableResource(), @@ -598,7 +603,8 @@ public class FifoScheduler implements ResourceScheduler { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode(), - nodeUpdatedEvent.getContainers()); + nodeUpdatedEvent.getNewlyLaunchedContainers(), + nodeUpdatedEvent.getCompletedContainers()); } break; case APP_ADDED: @@ -624,7 +630,11 @@ public class FifoScheduler implements ResourceScheduler { { ContainerExpiredSchedulerEvent containerExpiredEvent = (ContainerExpiredSchedulerEvent) event; - containerCompleted(getRMContainer(containerExpiredEvent.getContainerId()), + ContainerId containerid = containerExpiredEvent.getContainerId(); + containerCompleted(getRMContainer(containerid), + SchedulerUtils.createAbnormalContainerStatus( + containerid, + SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE); } break; @@ -633,23 +643,23 @@ public class FifoScheduler implements ResourceScheduler { } } - private void containerLaunchedOnNode(Container container, SchedulerNode node) { + private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) { // Get the application for the finished container - ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId(); + ApplicationAttemptId applicationAttemptId = containerId.getAppAttemptId(); SchedulerApp application = getApplication(applicationAttemptId); if (application == null) { LOG.info("Unknown application: " + applicationAttemptId + - " launched container " + container.getId() + + " launched container " + containerId + " on node: " + node); return; } - application.containerLaunchedOnNode(container.getId()); + application.containerLaunchedOnNode(containerId); } @Lock(FifoScheduler.class) private synchronized void containerCompleted(RMContainer rmContainer, - RMContainerEventType event) { + ContainerStatus containerStatus, RMContainerEventType event) { if (rmContainer == null) { LOG.info("Null container completed..."); return; @@ -672,7 +682,7 @@ public class FifoScheduler implements ResourceScheduler { } // Inform the application - application.containerCompleted(rmContainer, event); + application.containerCompleted(rmContainer, containerStatus, event); // Inform the node node.releaseContainer(container); @@ -691,7 +701,11 @@ public class FifoScheduler implements ResourceScheduler { SchedulerNode node = getNode(nodeInfo.getNodeID()); // Kill running containers for(RMContainer container : node.getRunningContainers()) { - containerCompleted(container, RMContainerEventType.KILL); + containerCompleted(container, + SchedulerUtils.createAbnormalContainerStatus( + container.getContainerId(), + SchedulerUtils.LOST_CONTAINER), + RMContainerEventType.KILL); } //Remove the node diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index e7178d013ef..242a815270a 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -53,9 +54,10 @@ public class MockNM { } public void containerStatus(Container container) throws Exception { - Map> conts = new HashMap>(); - conts.put(container.getId().getAppId(), Arrays - .asList(new Container[] { container })); + Map> conts = + new HashMap>(); + conts.put(container.getId().getAppId(), + Arrays.asList(new ContainerStatus[] { container.getContainerStatus() })); nodeHeartbeat(conts, true); } @@ -76,16 +78,16 @@ public class MockNM { } public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception { - return nodeHeartbeat(new HashMap>(), b); + return nodeHeartbeat(new HashMap>(), b); } public HeartbeatResponse nodeHeartbeat(Map> conts, boolean isHealthy) throws Exception { + List> conts, boolean isHealthy) throws Exception { NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeStatus status = Records.newRecord(NodeStatus.class); status.setNodeId(nodeId); - for (Map.Entry> entry : conts.entrySet()) { - status.setContainers(entry.getKey(), entry.getValue()); + for (Map.Entry> entry : conts.entrySet()) { + status.setContainersStatuses(entry.getValue()); } NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class); healthStatus.setHealthReport(""); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java index 23871a7c494..5f2b47c9ff1 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/NodeManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -53,9 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -133,10 +132,19 @@ public class NodeManager implements ContainerManager { int responseID = 0; + private List getContainerStatuses(Map> containers) { + List containerStatuses = new ArrayList(); + for (List appContainers : containers.values()) { + for (Container container : appContainers) { + containerStatuses.add(container.getContainerStatus()); + } + } + return containerStatuses; + } public void heartbeat() throws IOException { NodeStatus nodeStatus = org.apache.hadoop.yarn.server.resourcemanager.NodeManager.createNodeStatus( - nodeId, containers); + nodeId, getContainerStatuses(containers)); nodeStatus.setResponseId(responseID); NodeHeartbeatRequest request = recordFactory .newRecordInstance(NodeHeartbeatRequest.class); @@ -250,17 +258,29 @@ public class NodeManager implements ContainerManager { @Override synchronized public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException { - ContainerId containerID = request.getContainerId(); - GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class); + ContainerId containerId = request.getContainerId(); + List appContainers = containers.get(containerId.getAppId()); + Container container = null; + for (Container c : appContainers) { + if (c.getId().equals(containerId)) { + container = c; + } + } + GetContainerStatusResponse response = + recordFactory.newRecordInstance(GetContainerStatusResponse.class); + if (container != null && container.getContainerStatus() != null) { + response.setStatus(container.getContainerStatus()); + } return response; } - public static org.apache.hadoop.yarn.server.api.records.NodeStatus createNodeStatus( - NodeId nodeId, Map> containers) { + public static org.apache.hadoop.yarn.server.api.records.NodeStatus + createNodeStatus(NodeId nodeId, List containers) { RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class); + org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = + recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class); nodeStatus.setNodeId(nodeId); - nodeStatus.addAllContainers(containers); + nodeStatus.setContainersStatuses(containers); NodeHealthStatus nodeHealthStatus = recordFactory.newRecordInstance(NodeHealthStatus.class); nodeHealthStatus.setIsNodeHealthy(true); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java index 1da7733ceea..79320b6eb76 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java @@ -66,11 +66,11 @@ public class TestApplicationCleanup { //kick the scheduler nm1.nodeHeartbeat(true); List conts = am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList(); + new ArrayList()).getAllocatedContainers(); int contReceived = conts.size(); while (contReceived < request) { conts = am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList(); + new ArrayList()).getAllocatedContainers(); contReceived += conts.size(); Log.info("Got " + contReceived + " containers. Waiting to get " + request); Thread.sleep(2000); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index 0fa897ac52f..605a0f363b8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -92,12 +92,12 @@ public class TestFifoScheduler { // kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0 nm1.nodeHeartbeat(true); - while (am1Response.getNewContainerCount() < 1) { + while (am1Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 1..."); Thread.sleep(1000); am1Response = am1.schedule(); } - while (am2Response.getNewContainerCount() < 1) { + while (am2Response.getAllocatedContainers().size() < 1) { LOG.info("Waiting for containers to be created for app 2..."); Thread.sleep(1000); am2Response = am2.schedule(); @@ -105,12 +105,12 @@ public class TestFifoScheduler { // kick the scheduler, nothing given remaining 2 GB. nm2.nodeHeartbeat(true); - List allocated1 = am1Response.getNewContainerList(); + List allocated1 = am1Response.getAllocatedContainers(); Assert.assertEquals(1, allocated1.size()); Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory()); Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId()); - List allocated2 = am2Response.getNewContainerList(); + List allocated2 = am2Response.getAllocatedContainers(); Assert.assertEquals(1, allocated2.size()); Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory()); Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId()); @@ -137,7 +137,7 @@ public class TestFifoScheduler { Thread.sleep(1000); } Assert.assertEquals(1, attempt1.getJustFinishedContainers().size()); - Assert.assertEquals(1, am1.schedule().getFinishedContainerList().size()); + Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size()); Assert.assertEquals(5 * GB, rm.getResourceScheduler().getUsedResource( nm1.getNodeId()).getMemory()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index dbce71e24d1..03941e3625d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -86,11 +86,11 @@ public class TestRM { //kick the scheduler nm1.nodeHeartbeat(true); List conts = am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList(); + new ArrayList()).getAllocatedContainers(); int contReceived = conts.size(); while (contReceived < 3) {//only 3 containers are available on node1 conts.addAll(am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList()); + new ArrayList()).getAllocatedContainers()); contReceived = conts.size(); LOG.info("Got " + contReceived + " containers. Waiting to get " + 3); Thread.sleep(2000); @@ -100,11 +100,11 @@ public class TestRM { //send node2 heartbeat nm2.nodeHeartbeat(true); conts = am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList(); + new ArrayList()).getAllocatedContainers(); contReceived = conts.size(); while (contReceived < 10) { conts.addAll(am.allocate(new ArrayList(), - new ArrayList()).getNewContainerList()); + new ArrayList()).getAllocatedContainers()); contReceived = conts.size(); LOG.info("Got " + contReceived + " containers. Waiting to get " + 10); Thread.sleep(2000); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3c283c3ecbd..01acd1162ff 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -148,7 +149,8 @@ public class TestLeafQueue { CSQueue parent = queue.getParent(); doNothing().when(parent).completedContainer( any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class), - any(RMContainer.class), any(RMContainerEventType.class)); + any(RMContainer.class), any(ContainerStatus.class), + any(RMContainerEventType.class)); return queue; } @@ -238,7 +240,7 @@ public class TestLeafQueue { // Release each container from app_0 for (RMContainer rmContainer : app_0.getLiveContainers()) { a.completedContainer(clusterResource, app_0, node_0, rmContainer, - RMContainerEventType.KILL); + null, RMContainerEventType.KILL); } assertEquals(1*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -247,7 +249,7 @@ public class TestLeafQueue { // Release each container from app_1 for (RMContainer rmContainer : app_1.getLiveContainers()) { a.completedContainer(clusterResource, app_1, node_0, rmContainer, - RMContainerEventType.KILL); + null, RMContainerEventType.KILL); } assertEquals(0*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -392,7 +394,7 @@ public class TestLeafQueue { // 8. Release each container from app_0 for (RMContainer rmContainer : app_0.getLiveContainers()) { a.completedContainer(clusterResource, app_0, node_0, rmContainer, - RMContainerEventType.KILL); + null, RMContainerEventType.KILL); } assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -403,7 +405,7 @@ public class TestLeafQueue { // 9. Release each container from app_2 for (RMContainer rmContainer : app_2.getLiveContainers()) { a.completedContainer(clusterResource, app_2, node_0, rmContainer, - RMContainerEventType.KILL); + null, RMContainerEventType.KILL); } assertEquals(2*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -414,7 +416,7 @@ public class TestLeafQueue { // 10. Release each container from app_3 for (RMContainer rmContainer : app_3.getLiveContainers()) { a.completedContainer(clusterResource, app_3, node_0, rmContainer, - RMContainerEventType.KILL); + null, RMContainerEventType.KILL); } assertEquals(0*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -489,7 +491,7 @@ public class TestLeafQueue { // Now free 1 container from app_0 i.e. 1G a.completedContainer(clusterResource, app_0, node_0, - app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL); a.assignContainers(clusterResource, node_0); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -499,7 +501,7 @@ public class TestLeafQueue { // Now finish another container from app_0 and fulfill the reservation a.completedContainer(clusterResource, app_0, node_0, - app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL); a.assignContainers(clusterResource, node_0); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); @@ -582,7 +584,7 @@ public class TestLeafQueue { // Now free 1 container from app_0 i.e. 1G, and re-reserve it a.completedContainer(clusterResource, app_0, node_0, - app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL); a.assignContainers(clusterResource, node_0); assertEquals(5*GB, a.getUsedResources().getMemory()); assertEquals(1*GB, app_0.getCurrentConsumption().getMemory()); @@ -613,7 +615,7 @@ public class TestLeafQueue { // Now finish another container from app_0 and see the reservation cancelled a.completedContainer(clusterResource, app_0, node_0, - app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL); + app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL); a.assignContainers(clusterResource, node_0); assertEquals(4*GB, a.getUsedResources().getMemory()); assertEquals(0*GB, app_0.getCurrentConsumption().getMemory()); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java index 453cddd1f2a..fdc1c7bd880 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerTokenSecretManager.java @@ -241,7 +241,7 @@ public class TestContainerTokenSecretManager { allocateRequest.addAllAsks(ask); allocateRequest.addAllReleases(release); List allocatedContainers = scheduler.allocate(allocateRequest) - .getAMResponse().getNewContainerList(); + .getAMResponse().getAllocatedContainers(); waitCounter = 0; while ((allocatedContainers == null || allocatedContainers.size() == 0) @@ -251,7 +251,7 @@ public class TestContainerTokenSecretManager { allocateRequest.setResponseId(allocateRequest.getResponseId() + 1); allocatedContainers = scheduler.allocate(allocateRequest).getAMResponse() - .getNewContainerList(); + .getAllocatedContainers(); } Assert.assertNotNull("Container is not allocted!", allocatedContainers);