Merge -r 1169483:1169484 from trunk to branch-0.23 to fix MAPREDUCE-2933.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1169485 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-11 17:28:08 +00:00
parent 6814d04c3e
commit cbd0ddfebe
37 changed files with 540 additions and 572 deletions

View File

@ -250,6 +250,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2711. Update TestBlockPlacementPolicyRaid for the new namesystem
and block management APIs. (szetszwo)
MAPREDUCE-2933. Change allocate call to return ContainerStatus for
completed containers rather than Container. (acmurthy)
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
@ -414,8 +415,8 @@ public class RMContainerAllocator extends RMContainerRequestor
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
AMResponse response = makeRemoteRequest();
int newHeadRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
List<Container> newContainers = response.getNewContainerList();
List<Container> finishedContainers = response.getFinishedContainerList();
List<Container> newContainers = response.getAllocatedContainers();
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
//something changed
recalculateReduceSchedule = true;
@ -426,12 +427,12 @@ public class RMContainerAllocator extends RMContainerRequestor
allocatedContainers.add(cont);
LOG.debug("Received new Container :" + cont);
}
for (Container cont : finishedContainers) {
for (ContainerStatus cont : finishedContainers) {
LOG.info("Received completed container " + cont);
TaskAttemptId attemptID = assignedRequests.get(cont.getId());
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
if (attemptID == null) {
LOG.error("Container complete event for unknown container id "
+ cont.getId());
+ cont.getContainerId());
} else {
assignedRequests.remove(attemptID);
if (attemptID.getTaskId().getTaskType().equals(TaskType.MAP)) {
@ -443,7 +444,7 @@ public class RMContainerAllocator extends RMContainerRequestor
eventHandler.handle(new TaskAttemptEvent(attemptID,
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
// Send the diagnostics
String diagnostics = cont.getContainerStatus().getDiagnostics();
String diagnostics = cont.getDiagnostics();
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
diagnostics));
}

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -123,10 +122,11 @@ public abstract class RMContainerRequestor extends RMCommunicator {
availableResources = response.getAvailableResources();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getNewContainerCount() + " finishedContainers="
+ response.getFinishedContainerCount()
+ " resourcelimit=" + availableResources);
+ ask.size() + " release= " + release.size() +
" newContainers=" + response.getAllocatedContainers().size() +
" finishedContainers=" +
response.getCompletedContainersStatuses().size() +
" resourcelimit=" + availableResources);
ask.clear();
release.clear();

View File

@ -86,31 +86,16 @@ public interface AMResponse {
*/
@Public
@Stable
public List<Container> getNewContainerList();
public List<Container> getAllocatedContainers();
@Private
@Unstable
public Container getNewContainer(int index);
@Private
@Unstable
public int getNewContainerCount();
@Private
@Unstable
public void addAllNewContainers(List<Container> containers);
@Private
@Unstable
public void addNewContainer(Container container);
@Private
@Unstable
public void removeNewContainer(int index);
@Private
@Unstable
public void clearNewContainers();
/**
* Set the list of <em>newly allocated</em> <code>Container</code> by the
* <code>ResourceManager</code>.
* @param containers list of <em>newly allocated</em> <code>Container</code>
*/
@Public
@Stable
public void setAllocatedContainers(List<Container> containers);
/**
* Get the <em>available headroom</em> for resources in the cluster for the
@ -127,35 +112,18 @@ public interface AMResponse {
public void setAvailableResources(Resource limit);
/**
* Get the list of <em>completed containers</em>.
* @return the list of <em>completed containers</em>
* Get the list of <em>completed containers' statuses</em>.
* @return the list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public List<Container> getFinishedContainerList();
public List<ContainerStatus> getCompletedContainersStatuses();
@Private
@Unstable
public Container getFinishedContainer(int index);
@Private
@Unstable
public int getFinishedContainerCount();
@Private
@Unstable
public void addAllFinishedContainers(List<Container> containers);
@Private
@Unstable
public void addFinishedContainer(Container container);
@Private
@Unstable
public void removeFinishedContainer(int index);
@Private
@Unstable
public void clearFinishedContainers();
/**
* Set the list of list of <em>completed containers' statuses</em>.
* @param containers list of <em>completed containers' statuses</em>
*/
@Public
@Stable
public void setCompletedContainersStatuses(List<ContainerStatus> containers);
}

View File

@ -25,11 +25,13 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProto;
import org.apache.hadoop.yarn.proto.YarnProtos.AMResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
@ -41,8 +43,8 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
Resource limit;
private List<Container> newContainersList = null;
private List<Container> finishedContainersList = null;
private List<Container> allocatedContainers = null;
private List<ContainerStatus> completedContainersStatuses = null;
// private boolean hasLocalContainerList = false;
@ -63,15 +65,17 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
}
private synchronized void mergeLocalToBuilder() {
if (this.newContainersList != null) {
builder.clearNewContainers();
Iterable<ContainerProto> iterable = getProtoIterable(this.newContainersList);
builder.addAllNewContainers(iterable);
if (this.allocatedContainers != null) {
builder.clearAllocatedContainers();
Iterable<ContainerProto> iterable =
getProtoIterable(this.allocatedContainers);
builder.addAllAllocatedContainers(iterable);
}
if (this.finishedContainersList != null) {
builder.clearFinishedContainers();
Iterable<ContainerProto> iterable = getProtoIterable(this.finishedContainersList);
builder.addAllFinishedContainers(iterable);
if (this.completedContainersStatuses != null) {
builder.clearCompletedContainerStatuses();
Iterable<ContainerStatusProto> iterable =
getContainerStatusProtoIterable(this.completedContainersStatuses);
builder.addAllCompletedContainerStatuses(iterable);
}
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
@ -139,42 +143,31 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
}
@Override
public synchronized List<Container> getNewContainerList() {
public synchronized List<Container> getAllocatedContainers() {
initLocalNewContainerList();
return this.newContainersList;
}
@Override
public synchronized Container getNewContainer(int index) {
initLocalNewContainerList();
return this.newContainersList.get(index);
}
@Override
public synchronized int getNewContainerCount() {
initLocalNewContainerList();
return this.newContainersList.size();
return this.allocatedContainers;
}
//Once this is called. containerList will never be null - untill a getProto is called.
private synchronized void initLocalNewContainerList() {
if (this.newContainersList != null) {
if (this.allocatedContainers != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getNewContainersList();
newContainersList = new ArrayList<Container>();
List<ContainerProto> list = p.getAllocatedContainersList();
allocatedContainers = new ArrayList<Container>();
for (ContainerProto c : list) {
newContainersList.add(convertFromProtoFormat(c));
allocatedContainers.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void addAllNewContainers(final List<Container> containers) {
public synchronized void setAllocatedContainers(final List<Container> containers) {
if (containers == null)
return;
initLocalNewContainerList();
newContainersList.addAll(containers);
allocatedContainers.addAll(containers);
}
private synchronized Iterable<ContainerProto> getProtoIterable(
@ -208,85 +201,70 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
};
}
@Override
public synchronized void addNewContainer(Container containers) {
initLocalNewContainerList();
if (containers == null)
return;
this.newContainersList.add(containers);
}
private synchronized Iterable<ContainerStatusProto>
getContainerStatusProtoIterable(
final List<ContainerStatus> newContainersList) {
maybeInitBuilder();
return new Iterable<ContainerStatusProto>() {
@Override
public synchronized Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
@Override
public synchronized void removeNewContainer(int index) {
initLocalNewContainerList();
this.newContainersList.remove(index);
}
@Override
public synchronized void clearNewContainers() {
initLocalNewContainerList();
this.newContainersList.clear();
Iterator<ContainerStatus> iter = newContainersList.iterator();
@Override
public synchronized boolean hasNext() {
return iter.hasNext();
}
@Override
public synchronized ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public synchronized void remove() {
throw new UnsupportedOperationException();
}
};
}
};
}
//// Finished containers
@Override
public synchronized List<Container> getFinishedContainerList() {
public synchronized List<ContainerStatus> getCompletedContainersStatuses() {
initLocalFinishedContainerList();
return this.finishedContainersList;
}
@Override
public synchronized Container getFinishedContainer(int index) {
initLocalFinishedContainerList();
return this.finishedContainersList.get(index);
}
@Override
public synchronized int getFinishedContainerCount() {
initLocalFinishedContainerList();
return this.finishedContainersList.size();
return this.completedContainersStatuses;
}
//Once this is called. containerList will never be null - untill a getProto is called.
private synchronized void initLocalFinishedContainerList() {
if (this.finishedContainersList != null) {
if (this.completedContainersStatuses != null) {
return;
}
AMResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getFinishedContainersList();
finishedContainersList = new ArrayList<Container>();
List<ContainerStatusProto> list = p.getCompletedContainerStatusesList();
completedContainersStatuses = new ArrayList<ContainerStatus>();
for (ContainerProto c : list) {
finishedContainersList.add(convertFromProtoFormat(c));
for (ContainerStatusProto c : list) {
completedContainersStatuses.add(convertFromProtoFormat(c));
}
}
@Override
public synchronized void addAllFinishedContainers(final List<Container> containers) {
public synchronized void setCompletedContainersStatuses(
final List<ContainerStatus> containers) {
if (containers == null)
return;
initLocalFinishedContainerList();
finishedContainersList.addAll(containers);
completedContainersStatuses.addAll(containers);
}
@Override
public synchronized void addFinishedContainer(Container containers) {
initLocalFinishedContainerList();
if (containers == null)
return;
this.finishedContainersList.add(containers);
}
@Override
public synchronized void removeFinishedContainer(int index) {
initLocalFinishedContainerList();
this.finishedContainersList.remove(index);
}
@Override
public synchronized void clearFinishedContainers() {
initLocalFinishedContainerList();
this.finishedContainersList.clear();
}
private synchronized ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
private synchronized ContainerPBImpl convertFromProtoFormat(
ContainerProto p) {
return new ContainerPBImpl(p);
}
@ -294,6 +272,15 @@ public class AMResponsePBImpl extends ProtoBase<AMResponseProto> implements AMRe
return ((ContainerPBImpl)t).getProto();
}
private synchronized ContainerStatusPBImpl convertFromProtoFormat(
ContainerStatusProto p) {
return new ContainerStatusPBImpl(p);
}
private synchronized ContainerStatusProto convertToProtoFormat(ContainerStatus t) {
return ((ContainerStatusPBImpl)t).getProto();
}
private synchronized ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}

View File

@ -177,8 +177,8 @@ message ResourceRequestProto {
message AMResponseProto {
optional bool reboot = 1;
optional int32 response_id = 2;
repeated ContainerProto new_containers = 3;
repeated ContainerProto finished_containers = 4;
repeated ContainerProto allocated_containers = 3;
repeated ContainerStatusProto completed_container_statuses = 4;
optional ResourceProto limit = 5;
}

View File

@ -18,10 +18,8 @@
package org.apache.hadoop.yarn.server.api.records;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -31,17 +29,13 @@ public interface NodeStatus {
public abstract NodeId getNodeId();
public abstract int getResponseId();
public abstract Map<ApplicationId, List<Container>> getAllContainers();
public abstract List<Container> getContainers(ApplicationId key);
public abstract List<ContainerStatus> getContainersStatuses();
public abstract void setContainersStatuses(
List<ContainerStatus> containersStatuses);
NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
public abstract void addAllContainers(Map<ApplicationId, List<Container>> containers);
public abstract void setContainers(ApplicationId key, List<Container> containers);
public abstract void removeContainers(ApplicationId key);
public abstract void clearContainers();
}

View File

@ -20,27 +20,19 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ApplicationIdContainerListMapProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.ContainerListProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -51,7 +43,7 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
boolean viaProto = false;
private NodeId nodeId = null;
private Map<ApplicationIdProto, List<Container>> containers = null;
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
public NodeStatusPBImpl() {
@ -99,6 +91,39 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
viaProto = false;
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainersStatuses();
if (containers == null)
return;
Iterable<ContainerStatusProto> iterable = new Iterable<ContainerStatusProto>() {
@Override
public Iterator<ContainerStatusProto> iterator() {
return new Iterator<ContainerStatusProto>() {
Iterator<ContainerStatus> iter = containers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerStatusProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersStatuses(iterable);
}
@Override
public int getResponseId() {
@ -133,24 +158,17 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
}
@Override
public Map<ApplicationId, List<Container>> getAllContainers() {
public List<ContainerStatus> getContainersStatuses() {
initContainers();
HashMap<ApplicationId, List<Container>> returnMap = new HashMap<ApplicationId, List<Container>>(
this.containers.size());
for (Entry<ApplicationIdProto, List<Container>> entry : this.containers.entrySet()) {
returnMap.put(convertFromProtoFormat(entry.getKey()), entry.getValue());
}
return returnMap;
return this.containers;
}
@Override
public List<Container> getContainers(ApplicationId applicationId) {
initContainers();
ApplicationIdProto applicationIdProto = convertToProtoFormat(applicationId);
if (this.containers.get(applicationIdProto) == null) {
this.containers.put(applicationIdProto, new ArrayList<Container>());
public void setContainersStatuses(List<ContainerStatus> containers) {
if (containers == null) {
builder.clearContainersStatuses();
}
return this.containers.get(applicationIdProto);
this.containers = containers;
}
private void initContainers() {
@ -158,59 +176,15 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
return;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdContainerListMapProto> list = p.getContainersList();
this.containers = new HashMap<ApplicationIdProto, List<Container>>();
List<ContainerStatusProto> list = p.getContainersStatusesList();
this.containers = new ArrayList<ContainerStatus>();
for (ApplicationIdContainerListMapProto c : list) {
this.containers.put(c.getApplicationId(), convertFromProtoFormat(c.getValue()));
for (ContainerStatusProto c : list) {
this.containers.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllContainers(final Map<ApplicationId, List<Container>> containers) {
if (containers == null)
return;
initContainers();
for (Entry<ApplicationId, List<Container>> entry : containers.entrySet()) {
this.containers.put(convertToProtoFormat(entry.getKey()), entry.getValue());
}
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainers();
viaProto = false;
Iterable<ApplicationIdContainerListMapProto> iterable = new Iterable<ApplicationIdContainerListMapProto>() {
@Override
public Iterator<ApplicationIdContainerListMapProto> iterator() {
return new Iterator<ApplicationIdContainerListMapProto>() {
Iterator<ApplicationIdProto> keyIter = containers.keySet().iterator();
@Override
public boolean hasNext() {
return keyIter.hasNext();
}
@Override
public ApplicationIdContainerListMapProto next() {
ApplicationIdProto applicationIdProto = keyIter.next();
return ApplicationIdContainerListMapProto.newBuilder().setApplicationId(applicationIdProto).setValue(convertToProtoFormat(containers.get(applicationIdProto))).build();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainers(iterable);
}
@Override
public NodeHealthStatus getNodeHealthStatus() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
@ -233,66 +207,6 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
this.nodeHealthStatus = healthStatus;
}
/*
*
* @Override
public String getApplicationName() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasApplicationName()) {
return null;
}
return (p.getApplicationName());
}
@Override
public void setApplicationName(String applicationName) {
maybeInitBuilder();
if (applicationName == null) {
builder.clearApplicationName();
return;
}
builder.setApplicationName((applicationName));
}
*/
private ContainerListProto convertToProtoFormat(List<Container> src) {
ContainerListProto.Builder ret = ContainerListProto.newBuilder();
for (Container c : src) {
ret.addContainer(((ContainerPBImpl)c).getProto());
}
return ret.build();
}
private List<Container> convertFromProtoFormat(ContainerListProto src) {
List<Container> ret = new ArrayList<Container>();
for (ContainerProto c : src.getContainerList()) {
ret.add(convertFromProtoFormat(c));
}
return ret;
}
private Container convertFromProtoFormat(ContainerProto src) {
return new ContainerPBImpl(src);
}
@Override
public void setContainers(ApplicationId applicationId, List<Container> containers) {
initContainers();
this.containers.put(convertToProtoFormat(applicationId), containers);
}
@Override
public void removeContainers(ApplicationId applicationId) {
initContainers();
this.containers.remove(convertToProtoFormat(applicationId));
}
@Override
public void clearContainers() {
initContainers();
this.containers.clear();
}
private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
@ -301,14 +215,6 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
return new NodeIdPBImpl(proto);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId applicationId) {
return ((ApplicationIdPBImpl)applicationId).getProto();
}
private ApplicationId convertFromProtoFormat(ApplicationIdProto proto) {
return new ApplicationIdPBImpl(proto);
}
private NodeHealthStatusProto convertToProtoFormat(
NodeHealthStatus healthStatus) {
return ((NodeHealthStatusPBImpl) healthStatus).getProto();
@ -317,4 +223,12 @@ public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements Node
private NodeHealthStatus convertFromProtoFormat(NodeHealthStatusProto proto) {
return new NodeHealthStatusPBImpl(proto);
}
private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
return new ContainerStatusPBImpl(c);
}
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
}

View File

@ -26,7 +26,7 @@ import "yarn_protos.proto";
message NodeStatusProto {
optional NodeIdProto node_id = 1;
optional int32 response_id = 2;
repeated ApplicationIdContainerListMapProto containers = 3;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
}
@ -41,12 +41,3 @@ message HeartbeatResponseProto {
repeated ApplicationIdProto applications_to_cleanup = 4;
}
message ContainerListProto {
repeated ContainerProto container = 1;
}
message ApplicationIdContainerListMapProto {
optional ApplicationIdProto application_id = 1;
optional ContainerListProto value = 2;
}

View File

@ -30,13 +30,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityInfo;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -73,7 +73,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private String rmAddress;
private Resource totalResource;
private String containerManagerBindAddress;
private String nodeHttpAddress;
private String hostName;
private int containerManagerPort;
private int httpPort;
@ -127,7 +126,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.httpPort = httpBindAddress.getPort();
this.containerManagerBindAddress =
this.hostName + ":" + this.containerManagerPort;
this.nodeHttpAddress = this.hostName + ":" + this.httpPort;
LOG.info("Configured ContainerManager Address is "
+ this.containerManagerBindAddress);
// Registration has to be in start so that ContainerManager can get the
@ -195,35 +193,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
nodeStatus.setNodeId(this.nodeId);
int numActiveContainers = 0;
List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
ContainerId containerId = e.getKey();
Container container = e.getValue();
List<org.apache.hadoop.yarn.api.records.Container> applicationContainers = nodeStatus
.getContainers(container.getContainerID().getAppId());
if (applicationContainers == null) {
applicationContainers = new ArrayList<org.apache.hadoop.yarn.api.records.Container>();
nodeStatus.setContainers(container.getContainerID().getAppId(),
applicationContainers);
}
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.Container c = container.cloneAndGetContainer();
c.setNodeId(this.nodeId);
c.setNodeHttpAddress(this.nodeHttpAddress); // TODO: don't set everytime.
applicationContainers.add(c);
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
containersStatuses.add(containerStatus);
++numActiveContainers;
LOG.info("Sending out status for container: " + c);
LOG.info("Sending out status for container: " + containerStatus);
if (c.getState() == ContainerState.COMPLETE) {
if (containerStatus.getState() == ContainerState.COMPLETE) {
// Remove
i.remove();
LOG.info("Removed completed container " + containerId);
}
}
nodeStatus.setContainersStatuses(containersStatuses);
LOG.debug(this.containerManagerBindAddress + " sending out status for " + numActiveContainers
+ " containers");

View File

@ -40,8 +40,6 @@ public interface Container extends EventHandler<ContainerEvent> {
Map<Path,String> getLocalizedResources();
org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer();
ContainerStatus cloneAndGetContainerStatus();
String toString();

View File

@ -326,24 +326,6 @@ public class ContainerImpl implements Container {
}
}
@Override
public
org.apache.hadoop.yarn.api.records.Container cloneAndGetContainer() {
this.readLock.lock();
try {
org.apache.hadoop.yarn.api.records.Container c =
recordFactory.newRecordInstance(
org.apache.hadoop.yarn.api.records.Container.class);
c.setId(this.launchContext.getContainerId());
c.setResource(this.launchContext.getResource());
c.setState(getCurrentState());
c.setContainerStatus(cloneAndGetContainerStatus());
return c;
} finally {
this.readLock.unlock();
}
}
@Override
public ContainerLaunchContext getLaunchContext() {
this.readLock.lock();

View File

@ -21,6 +21,10 @@ package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@ -32,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -98,13 +103,30 @@ public class TestNodeStatusUpdater {
ContainerId firstContainerID = recordFactory.newRecordInstance(ContainerId.class);
ContainerId secondContainerID = recordFactory.newRecordInstance(ContainerId.class);
private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
List<ContainerStatus> containers) {
Map<ApplicationId, List<ContainerStatus>> map =
new HashMap<ApplicationId, List<ContainerStatus>>();
for (ContainerStatus cs : containers) {
ApplicationId applicationId = cs.getContainerId().getAppId();
List<ContainerStatus> appContainers = map.get(applicationId);
if (appContainers == null) {
appContainers = new ArrayList<ContainerStatus>();
map.put(applicationId, appContainers);
}
appContainers.add(cs);
}
return map;
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++);
Map<ApplicationId, List<ContainerStatus>> appToContainers =
getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
if (heartBeatID == 1) {
Assert.assertEquals(0, nodeStatus.getAllContainers().size());
Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
// Give a container to the NM.
applicationID.setId(heartBeatID);
@ -121,11 +143,9 @@ public class TestNodeStatusUpdater {
} else if (heartBeatID == 2) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getAllContainers().size());
nodeStatus.getContainersStatuses().size());
Assert.assertEquals("Number of container for the app should be one!",
1, nodeStatus.getContainers(applicationID).size());
Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
.getResource().getMemory());
1, appToContainers.get(applicationID).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =
@ -147,13 +167,9 @@ public class TestNodeStatusUpdater {
} else if (heartBeatID == 3) {
// Checks on the RM end
Assert.assertEquals("Number of applications should only be one!", 1,
nodeStatus.getAllContainers().size());
appToContainers.size());
Assert.assertEquals("Number of container for the app should be two!",
2, nodeStatus.getContainers(applicationID).size());
Assert.assertEquals(2, nodeStatus.getContainers(applicationID).get(0)
.getResource().getMemory());
Assert.assertEquals(3, nodeStatus.getContainers(applicationID).get(1)
.getResource().getMemory());
2, appToContainers.get(applicationID).size());
// Checks on the NM end
ConcurrentMap<ContainerId, Container> activeContainers =

View File

@ -232,8 +232,8 @@ public class ApplicationMasterService extends AbstractService implements
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
response.addAllNewContainers(allocation.getContainers());
response.addAllFinishedContainers(appAttempt
response.setAllocatedContainers(allocation.getContainers());
response.setCompletedContainersStatuses(appAttempt
.pullJustFinishedContainers());
response.setResponseId(lastResponse.getResponseId() + 1);
response.setAvailableResources(allocation.getResourceLimit());

View File

@ -20,10 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
@ -31,7 +27,6 @@ import org.apache.avro.ipc.Server;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.SecurityInfo;
@ -252,7 +247,7 @@ public class ResourceTrackerService extends AbstractService implements
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getAllContainers(), latestResponse));
remoteNodeStatus.getContainersStatuses(), latestResponse));
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse;

View File

@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
@ -47,9 +48,9 @@ public interface RMAppAttempt extends EventHandler<RMAppAttemptEvent>{
Set<NodeId> getRanNodes();
List<Container> pullJustFinishedContainers();
List<ContainerStatus> pullJustFinishedContainers();
List<Container> getJustFinishedContainers();
List<ContainerStatus> getJustFinishedContainers();
Container getMasterContainer();

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -97,8 +98,8 @@ public class RMAppAttemptImpl implements RMAppAttempt {
//nodes on while this attempt's containers ran
private final Set<NodeId> ranNodes =
new HashSet<NodeId>();
private final List<Container> justFinishedContainers =
new ArrayList<Container>();
private final List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
private float progress = 0;
@ -333,7 +334,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
}
@Override
public List<Container> getJustFinishedContainers() {
public List<ContainerStatus> getJustFinishedContainers() {
this.readLock.lock();
try {
return this.justFinishedContainers;
@ -343,11 +344,11 @@ public class RMAppAttemptImpl implements RMAppAttempt {
}
@Override
public List<Container> pullJustFinishedContainers() {
public List<ContainerStatus> pullJustFinishedContainers() {
this.writeLock.lock();
try {
List<Container> returnList = new ArrayList<Container>(
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>(
this.justFinishedContainers.size());
returnList.addAll(this.justFinishedContainers);
this.justFinishedContainers.clear();
@ -705,11 +706,13 @@ public class RMAppAttemptImpl implements RMAppAttempt {
RMAppAttemptContainerFinishedEvent containerFinishedEvent
= (RMAppAttemptContainerFinishedEvent) event;
Container container = containerFinishedEvent.getContainer();
ContainerStatus containerStatus =
containerFinishedEvent.getContainerStatus();
// Is this container the AmContainer? If the finished container is same as
// the AMContainer, AppAttempt fails
if (appAttempt.masterContainer.getId().equals(container.getId())) {
if (appAttempt.masterContainer.getId().equals(
containerStatus.getContainerId())) {
new FinalTransition(RMAppAttemptState.FAILED).transition(
appAttempt, containerFinishedEvent);
return RMAppAttemptState.FAILED;
@ -718,7 +721,7 @@ public class RMAppAttemptImpl implements RMAppAttempt {
// Normal container.
// Put it in completedcontainers list
appAttempt.justFinishedContainers.add(container);
appAttempt.justFinishedContainers.add(containerStatus);
return RMAppAttemptState.RUNNING;
}
}

View File

@ -19,22 +19,22 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
public class RMAppAttemptContainerFinishedEvent extends RMAppAttemptEvent {
private final Container container;
private final ContainerStatus containerStatus;
public RMAppAttemptContainerFinishedEvent(ApplicationAttemptId appAttemptId,
Container container) {
ContainerStatus containerStatus) {
super(appAttemptId, RMAppAttemptEventType.CONTAINER_FINISHED);
this.container = container;
this.containerStatus = containerStatus;
}
public Container getContainer() {
return this.container;
public ContainerStatus getContainerStatus() {
return this.containerStatus;
}
}

View File

@ -26,8 +26,8 @@ public class RMContainerFinishedEvent extends RMContainerEvent {
private final ContainerStatus remoteContainerStatus;
public RMContainerFinishedEvent(ContainerId containerId,
ContainerStatus containerStatus) {
super(containerId, RMContainerEventType.FINISHED);
ContainerStatus containerStatus, RMContainerEventType event) {
super(containerId, event);
this.remoteContainerStatus = containerStatus;
}

View File

@ -92,7 +92,7 @@ public class RMContainerImpl implements RMContainer {
// Transitions from RUNNING state
.addTransition(RMContainerState.RUNNING, RMContainerState.COMPLETED,
RMContainerEventType.FINISHED, new ContainerCompletedTransition())
RMContainerEventType.FINISHED, new FinishedTransition())
.addTransition(RMContainerState.RUNNING, RMContainerState.KILLED,
RMContainerEventType.KILL, new KillTransition())
@ -273,10 +273,16 @@ public class RMContainerImpl implements RMContainer {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
// Update container-status for diagnostics. Today we completely
// replace it on finish. We may just need to update diagnostics.
container.container.setContainerStatus(finishedEvent
.getRemoteContainerStatus());
// Inform AppAttempt
container.eventHandler.handle(new RMAppAttemptContainerFinishedEvent(
container.appAttemptId, container.container));
container.appAttemptId, container.container.getContainerStatus()));
}
}
@ -312,22 +318,4 @@ public class RMContainerImpl implements RMContainer {
}
}
private static final class ContainerCompletedTransition extends
FinishedTransition {
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
RMContainerFinishedEvent finishedEvent = (RMContainerFinishedEvent) event;
// Update container-status for diagnostics. Today we completely
// replace it on finish. We may just need to update diagnostics.
// ^TODO
container.container.setContainerStatus(finishedEvent
.getRemoteContainerStatus());
// Inform appAttempt
super.transition(container, event);
}
}
}

View File

@ -23,7 +23,6 @@ import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -36,9 +35,9 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -87,8 +86,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.newRecordInstance(NodeHealthStatus.class);
/* set of containers that have just launched */
private final Map<ContainerId, Container> justLaunchedContainers =
new HashMap<ContainerId, Container>();
private final Map<ContainerId, ContainerStatus> justLaunchedContainers =
new HashMap<ContainerId, ContainerStatus>();
/* set of containers that need to be cleaned */
@ -355,43 +354,29 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Filter the map to only obtain just launched containers and finished
// containers.
Map<ApplicationId, List<Container>> remoteAppContainersMap = statusEvent
.getContainersCollection();
Map<ApplicationId, List<Container>> containersMapForScheduler = new HashMap<ApplicationId, List<Container>>(
remoteAppContainersMap.size());
for (Entry<ApplicationId, List<Container>> entrySet : remoteAppContainersMap
.entrySet()) {
ApplicationId appId = entrySet.getKey();
List<Container> remoteContainerList = entrySet.getValue();
if (!containersMapForScheduler.containsKey(appId)) {
containersMapForScheduler.put(appId, new ArrayList<Container>(
remoteContainerList.size()));
}
List<Container> entryForThisApp = containersMapForScheduler
.get(appId);
for (Container remoteContainer : remoteContainerList) {
// Process running containers
ContainerId containerId = remoteContainer.getId();
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
// Just launched container. RM knows about it the first time.
rmNode.justLaunchedContainers.put(containerId, remoteContainer);
entryForThisApp.add(remoteContainer);
}
} else {
// A finished container
rmNode.justLaunchedContainers.remove(containerId);
entryForThisApp.add(remoteContainer);
List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers =
new ArrayList<ContainerStatus>();
for (ContainerStatus remoteContainer : statusEvent.getContainers()) {
// Process running containers
ContainerId containerId = remoteContainer.getContainerId();
if (remoteContainer.getState() == ContainerState.RUNNING) {
if (!rmNode.justLaunchedContainers.containsKey(containerId)) {
// Just launched container. RM knows about it the first time.
rmNode.justLaunchedContainers.put(containerId, remoteContainer);
newlyLaunchedContainers.add(remoteContainer);
}
} else {
// A finished container
rmNode.justLaunchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
}
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode, containersMapForScheduler));
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers));
return RMNodeState.RUNNING;
}

View File

@ -19,10 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
@ -30,11 +28,11 @@ import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus;
private Map<ApplicationId, List<Container>> containersCollection;
private List<ContainerStatus> containersCollection;
private final HeartbeatResponse latestResponse;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
Map<ApplicationId, List<Container>> collection,
List<ContainerStatus> collection,
HeartbeatResponse latestResponse) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
@ -46,7 +44,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
return this.nodeHealthStatus;
}
public Map<ApplicationId, List<Container>> getContainersCollection() {
public List<ContainerStatus> getContainers() {
return this.containersCollection;
}

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -161,26 +162,21 @@ public class SchedulerApp {
RMContainerEventType.LAUNCHED));
}
public synchronized void killContainers(
SchedulerApp application) {
}
synchronized public void containerCompleted(RMContainer rmContainer,
RMContainerEventType event) {
ContainerStatus containerStatus, RMContainerEventType event) {
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
// Inform the container
if (event.equals(RMContainerEventType.FINISHED)) {
// Have to send diagnostics for finished containers.
rmContainer.handle(new RMContainerFinishedEvent(containerId,
container.getContainerStatus()));
} else {
rmContainer.handle(new RMContainerEvent(containerId, event));
}
rmContainer.handle(
new RMContainerFinishedEvent(
containerId,
containerStatus,
event)
);
LOG.info("Completed container: " + rmContainer.getContainerId() +
" in state: " + rmContainer.getState());
" in state: " + rmContainer.getState() + " event:" + event);
// Remove from the list of containers
liveContainers.remove(rmContainer.getContainerId());

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
/**
* Utilities shared by schedulers.
*/
@Private
@Unstable
public class SchedulerUtils {
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public static final String RELEASED_CONTAINER =
"Container released by application";
public static final String LOST_CONTAINER =
"Container released on a *lost* node";
public static final String COMPLETED_APPLICATION =
"Container of a completed application";
public static final String EXPIRED_CONTAINER =
"Container expired since it unused";
public static final String UNRESERVED_CONTAINER =
"Container reservation no longer required.";
/**
* Utility to create a {@link ContainerStatus} during exceptional
* circumstances.
*
* @param containerId {@link ContainerId} of returned/released/lost container.
* @param diagnostics diagnostic message
* @return <code>ContainerStatus</code> for an returned/released/lost
* container
*/
public static ContainerStatus createAbnormalContainerStatus(
ContainerId containerId, String diagnostics) {
ContainerStatus containerStatus =
recordFactory.newRecordInstance(ContainerStatus.class);
containerStatus.setContainerId(containerId);
containerStatus.setDiagnostics(diagnostics);
containerStatus.setExitStatus("ABORTED");
containerStatus.setState(ContainerState.COMPLETE);
return containerStatus;
}
}

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
@ -165,11 +166,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @param node node on which the container completed
* @param container completed container,
* <code>null</code> if it was just a reservation
* @param containerStatus <code>ContainerStatus</code> for the completed
* container
* @param event event to be sent to the container
*/
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node,
RMContainer container, RMContainerEventType event);
RMContainer container, ContainerStatus containerStatus,
RMContainerEventType event);
/**
* Get the number of applications in the queue.

View File

@ -36,10 +36,9 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
@ -59,11 +58,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -127,6 +126,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
private boolean initialized = false;
public CapacityScheduler() {}
public CSQueue getRootQueue() {
return root;
}
@ -392,12 +393,20 @@ implements ResourceScheduler, CapacitySchedulerContext {
// Release all the running containers
for (RMContainer rmContainer : application.getLiveContainers()) {
completedContainer(rmContainer, RMContainerEventType.KILL);
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Release all reserved containers
for (RMContainer rmContainer : application.getAllReservedContainers()) {
completedContainer(rmContainer, RMContainerEventType.KILL);
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
"Application Complete"),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
@ -445,7 +454,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer, RMContainerEventType.RELEASED);
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainerId,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
synchronized (application) {
@ -521,22 +534,23 @@ implements ResourceScheduler, CapacitySchedulerContext {
}
private synchronized void nodeUpdate(RMNode nm,
Map<ApplicationId, List<Container>> containers ) {
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
LOG.info("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
SchedulerNode node = getNode(nm.getNodeID());
// Processing the current containers running/finished on node
for (List<Container> appContainers : containers.values()) {
for (Container container : appContainers) {
if (container.getState() == ContainerState.RUNNING) {
containerLaunchedOnNode(container, node);
} else { // has to be 'COMPLETE'
LOG.info("DEBUG --- Container FINISHED: " + container.getId());
completedContainer(getRMContainer(container.getId()),
RMContainerEventType.FINISHED);
}
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.info("DEBUG --- Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are upto date and ready for scheduling.
@ -571,18 +585,18 @@ implements ResourceScheduler, CapacitySchedulerContext {
}
private void containerLaunchedOnNode(Container container, SchedulerNode node) {
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
ApplicationAttemptId applicationAttemptId = containerId.getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + container.getId() +
" launched container " + containerId +
" on node: " + node);
return;
}
application.containerLaunchedOnNode(container.getId());
application.containerLaunchedOnNode(containerId);
}
@Override
@ -604,7 +618,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getContainers());
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
}
break;
case APP_ADDED:
@ -625,7 +640,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
completedContainer(getRMContainer(containerExpiredEvent.getContainerId()),
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
containerId,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
}
break;
@ -652,13 +671,21 @@ implements ResourceScheduler, CapacitySchedulerContext {
// Remove running containers
List<RMContainer> runningContainers = node.getRunningContainers();
for (RMContainer container : runningContainers) {
completedContainer(container, RMContainerEventType.KILL);
completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
// Remove reservations, if any
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
completedContainer(reservedContainer, RMContainerEventType.KILL);
completedContainer(reservedContainer,
SchedulerUtils.createAbnormalContainerStatus(
reservedContainer.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
this.nodes.remove(nodeInfo.getNodeID());
@ -668,7 +695,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
@Lock(CapacityScheduler.class)
private synchronized void completedContainer(RMContainer rmContainer,
RMContainerEventType event) {
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
return;
@ -692,7 +719,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
// Inform the queue
LeafQueue queue = (LeafQueue)application.getQueue();
queue.completedContainer(clusterResource, application, node,
rmContainer, event);
rmContainer, containerStatus, event);
LOG.info("Application " + applicationAttemptId +
" released container " + container.getId() +

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -762,7 +764,11 @@ public class LeafQueue implements CSQueue {
// Release
Container container = rmContainer.getContainer();
completedContainer(clusterResource, application, node,
rmContainer, RMContainerEventType.RELEASED);
rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED);
return container.getResource();
}
@ -1175,7 +1181,7 @@ public class LeafQueue implements CSQueue {
@Override
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node, RMContainer rmContainer,
RMContainerEventType event) {
ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
synchronized (this) {
@ -1190,7 +1196,7 @@ public class LeafQueue implements CSQueue {
application.unreserve(node, rmContainer.getReservedPriority());
node.unreserveResource(application);
} else {
application.containerCompleted(rmContainer, event);
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
}
@ -1210,7 +1216,7 @@ public class LeafQueue implements CSQueue {
// Inform the parent queue
parent.completedContainer(clusterResource, application,
node, rmContainer, event);
node, rmContainer, null, event);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
@ -608,7 +609,7 @@ public class ParentQueue implements CSQueue {
@Override
public void completedContainer(Resource clusterResource,
SchedulerApp application, SchedulerNode node,
RMContainer rmContainer, RMContainerEventType event) {
RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType event) {
if (application != null) {
// Careful! Locking order is important!
// Book keeping
@ -626,7 +627,7 @@ public class ParentQueue implements CSQueue {
// Inform the parent
if (parent != null) {
parent.completedContainer(clusterResource, application,
node, rmContainer, event);
node, rmContainer, null, event);
}
}
}

View File

@ -23,26 +23,33 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
public class NodeUpdateSchedulerEvent extends SchedulerEvent {
private final RMNode rmNode;
private final Map<ApplicationId, List<Container>> containers;
private final List<ContainerStatus> newlyLaunchedContainers;
private final List<ContainerStatus> completedContainersStatuses;
public NodeUpdateSchedulerEvent(RMNode rmNode,
Map<ApplicationId, List<Container>> containers) {
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
super(SchedulerEventType.NODE_UPDATE);
this.rmNode = rmNode;
this.containers = containers;
this.newlyLaunchedContainers = newlyLaunchedContainers;
this.completedContainersStatuses = completedContainers;
}
public RMNode getRMNode() {
return rmNode;
}
public Map<ApplicationId, List<Container>> getContainers() {
return containers;
public List<ContainerStatus> getNewlyLaunchedContainers() {
return newlyLaunchedContainers;
}
public List<ContainerStatus> getCompletedContainers() {
return completedContainersStatuses;
}
}

View File

@ -39,10 +39,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Lock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@ -73,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
@ -90,7 +90,7 @@ public class FifoScheduler implements ResourceScheduler {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
private final RecordFactory recordFactory =
private static final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
Configuration conf;
@ -234,7 +234,11 @@ public class FifoScheduler implements ResourceScheduler {
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainer);
}
containerCompleted(rmContainer, RMContainerEventType.RELEASED);
containerCompleted(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
releasedContainer,
SchedulerUtils.RELEASED_CONTAINER),
RMContainerEventType.RELEASED);
}
if (!ask.isEmpty()) {
@ -312,7 +316,11 @@ public class FifoScheduler implements ResourceScheduler {
// Kill all 'live' containers
for (RMContainer container : application.getLiveContainers()) {
containerCompleted(container, RMContainerEventType.KILL);
containerCompleted(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
}
// Clean up pending requests, metrics etc.
@ -543,24 +551,21 @@ public class FifoScheduler implements ResourceScheduler {
}
private synchronized void nodeUpdate(RMNode rmNode,
Map<ApplicationId, List<Container>> remoteContainers) {
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
SchedulerNode node = getNode(rmNode.getNodeID());
for (List<Container> appContainers : remoteContainers.values()) {
for (Container container : appContainers) {
/* make sure the scheduler hasnt already removed the applications */
if (getApplication(container.getId().getAppAttemptId()) != null) {
if (container.getState() == ContainerState.RUNNING) {
containerLaunchedOnNode(container, node);
} else { // has to COMPLETE
containerCompleted(getRMContainer(container.getId()),
RMContainerEventType.FINISHED);
}
}
else {
LOG.warn("Scheduler not tracking application " + container.getId().getAppAttemptId());
}
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.info("DEBUG --- Container FINISHED: " + containerId);
containerCompleted(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
if (Resources.greaterThanOrEqual(node.getAvailableResource(),
@ -598,7 +603,8 @@ public class FifoScheduler implements ResourceScheduler {
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getContainers());
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
}
break;
case APP_ADDED:
@ -624,7 +630,11 @@ public class FifoScheduler implements ResourceScheduler {
{
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
containerCompleted(getRMContainer(containerExpiredEvent.getContainerId()),
ContainerId containerid = containerExpiredEvent.getContainerId();
containerCompleted(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
}
break;
@ -633,23 +643,23 @@ public class FifoScheduler implements ResourceScheduler {
}
}
private void containerLaunchedOnNode(Container container, SchedulerNode node) {
private void containerLaunchedOnNode(ContainerId containerId, SchedulerNode node) {
// Get the application for the finished container
ApplicationAttemptId applicationAttemptId = container.getId().getAppAttemptId();
ApplicationAttemptId applicationAttemptId = containerId.getAppAttemptId();
SchedulerApp application = getApplication(applicationAttemptId);
if (application == null) {
LOG.info("Unknown application: " + applicationAttemptId +
" launched container " + container.getId() +
" launched container " + containerId +
" on node: " + node);
return;
}
application.containerLaunchedOnNode(container.getId());
application.containerLaunchedOnNode(containerId);
}
@Lock(FifoScheduler.class)
private synchronized void containerCompleted(RMContainer rmContainer,
RMContainerEventType event) {
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
return;
@ -672,7 +682,7 @@ public class FifoScheduler implements ResourceScheduler {
}
// Inform the application
application.containerCompleted(rmContainer, event);
application.containerCompleted(rmContainer, containerStatus, event);
// Inform the node
node.releaseContainer(container);
@ -691,7 +701,11 @@ public class FifoScheduler implements ResourceScheduler {
SchedulerNode node = getNode(nodeInfo.getNodeID());
// Kill running containers
for(RMContainer container : node.getRunningContainers()) {
containerCompleted(container, RMContainerEventType.KILL);
containerCompleted(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
RMContainerEventType.KILL);
}
//Remove the node

View File

@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -53,9 +54,10 @@ public class MockNM {
}
public void containerStatus(Container container) throws Exception {
Map<ApplicationId, List<Container>> conts = new HashMap<ApplicationId, List<Container>>();
conts.put(container.getId().getAppId(), Arrays
.asList(new Container[] { container }));
Map<ApplicationId, List<ContainerStatus>> conts =
new HashMap<ApplicationId, List<ContainerStatus>>();
conts.put(container.getId().getAppId(),
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
nodeHeartbeat(conts, true);
}
@ -76,16 +78,16 @@ public class MockNM {
}
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<Container>>(), b);
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
}
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<Container>> conts, boolean isHealthy) throws Exception {
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setNodeId(nodeId);
for (Map.Entry<ApplicationId, List<Container>> entry : conts.entrySet()) {
status.setContainers(entry.getKey(), entry.getValue());
for (Map.Entry<ApplicationId, List<ContainerStatus>> entry : conts.entrySet()) {
status.setContainersStatuses(entry.getValue());
}
NodeHealthStatus healthStatus = Records.newRecord(NodeHealthStatus.class);
healthStatus.setHealthReport("");

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
@ -53,9 +54,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -133,10 +132,19 @@ public class NodeManager implements ContainerManager {
int responseID = 0;
private List<ContainerStatus> getContainerStatuses(Map<ApplicationId, List<Container>> containers) {
List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (List<Container> appContainers : containers.values()) {
for (Container container : appContainers) {
containerStatuses.add(container.getContainerStatus());
}
}
return containerStatuses;
}
public void heartbeat() throws IOException {
NodeStatus nodeStatus =
org.apache.hadoop.yarn.server.resourcemanager.NodeManager.createNodeStatus(
nodeId, containers);
nodeId, getContainerStatuses(containers));
nodeStatus.setResponseId(responseID);
NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class);
@ -250,17 +258,29 @@ public class NodeManager implements ContainerManager {
@Override
synchronized public GetContainerStatusResponse getContainerStatus(GetContainerStatusRequest request) throws YarnRemoteException {
ContainerId containerID = request.getContainerId();
GetContainerStatusResponse response = recordFactory.newRecordInstance(GetContainerStatusResponse.class);
ContainerId containerId = request.getContainerId();
List<Container> appContainers = containers.get(containerId.getAppId());
Container container = null;
for (Container c : appContainers) {
if (c.getId().equals(containerId)) {
container = c;
}
}
GetContainerStatusResponse response =
recordFactory.newRecordInstance(GetContainerStatusResponse.class);
if (container != null && container.getContainerStatus() != null) {
response.setStatus(container.getContainerStatus());
}
return response;
}
public static org.apache.hadoop.yarn.server.api.records.NodeStatus createNodeStatus(
NodeId nodeId, Map<ApplicationId, List<Container>> containers) {
public static org.apache.hadoop.yarn.server.api.records.NodeStatus
createNodeStatus(NodeId nodeId, List<ContainerStatus> containers) {
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus = recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
org.apache.hadoop.yarn.server.api.records.NodeStatus nodeStatus =
recordFactory.newRecordInstance(org.apache.hadoop.yarn.server.api.records.NodeStatus.class);
nodeStatus.setNodeId(nodeId);
nodeStatus.addAllContainers(containers);
nodeStatus.setContainersStatuses(containers);
NodeHealthStatus nodeHealthStatus =
recordFactory.newRecordInstance(NodeHealthStatus.class);
nodeHealthStatus.setIsNodeHealthy(true);

View File

@ -66,11 +66,11 @@ public class TestApplicationCleanup {
//kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList();
new ArrayList<ContainerId>()).getAllocatedContainers();
int contReceived = conts.size();
while (contReceived < request) {
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList();
new ArrayList<ContainerId>()).getAllocatedContainers();
contReceived += conts.size();
Log.info("Got " + contReceived + " containers. Waiting to get " + request);
Thread.sleep(2000);

View File

@ -92,12 +92,12 @@ public class TestFifoScheduler {
// kick the scheduler, 1 GB and 3 GB given to AM1 and AM2, remaining 0
nm1.nodeHeartbeat(true);
while (am1Response.getNewContainerCount() < 1) {
while (am1Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 1...");
Thread.sleep(1000);
am1Response = am1.schedule();
}
while (am2Response.getNewContainerCount() < 1) {
while (am2Response.getAllocatedContainers().size() < 1) {
LOG.info("Waiting for containers to be created for app 2...");
Thread.sleep(1000);
am2Response = am2.schedule();
@ -105,12 +105,12 @@ public class TestFifoScheduler {
// kick the scheduler, nothing given remaining 2 GB.
nm2.nodeHeartbeat(true);
List<Container> allocated1 = am1Response.getNewContainerList();
List<Container> allocated1 = am1Response.getAllocatedContainers();
Assert.assertEquals(1, allocated1.size());
Assert.assertEquals(1 * GB, allocated1.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
List<Container> allocated2 = am2Response.getNewContainerList();
List<Container> allocated2 = am2Response.getAllocatedContainers();
Assert.assertEquals(1, allocated2.size());
Assert.assertEquals(3 * GB, allocated2.get(0).getResource().getMemory());
Assert.assertEquals(nm1.getNodeId(), allocated2.get(0).getNodeId());
@ -137,7 +137,7 @@ public class TestFifoScheduler {
Thread.sleep(1000);
}
Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
Assert.assertEquals(1, am1.schedule().getFinishedContainerList().size());
Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
Assert.assertEquals(5 * GB, rm.getResourceScheduler().getUsedResource(
nm1.getNodeId()).getMemory());

View File

@ -86,11 +86,11 @@ public class TestRM {
//kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList();
new ArrayList<ContainerId>()).getAllocatedContainers();
int contReceived = conts.size();
while (contReceived < 3) {//only 3 containers are available on node1
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList());
new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
Thread.sleep(2000);
@ -100,11 +100,11 @@ public class TestRM {
//send node2 heartbeat
nm2.nodeHeartbeat(true);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList();
new ArrayList<ContainerId>()).getAllocatedContainers();
contReceived = conts.size();
while (contReceived < 10) {
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getNewContainerList());
new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
Thread.sleep(2000);

View File

@ -33,6 +33,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -148,7 +149,8 @@ public class TestLeafQueue {
CSQueue parent = queue.getParent();
doNothing().when(parent).completedContainer(
any(Resource.class), any(SchedulerApp.class), any(SchedulerNode.class),
any(RMContainer.class), any(RMContainerEventType.class));
any(RMContainer.class), any(ContainerStatus.class),
any(RMContainerEventType.class));
return queue;
}
@ -238,7 +240,7 @@ public class TestLeafQueue {
// Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
RMContainerEventType.KILL);
null, RMContainerEventType.KILL);
}
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -247,7 +249,7 @@ public class TestLeafQueue {
// Release each container from app_1
for (RMContainer rmContainer : app_1.getLiveContainers()) {
a.completedContainer(clusterResource, app_1, node_0, rmContainer,
RMContainerEventType.KILL);
null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -392,7 +394,7 @@ public class TestLeafQueue {
// 8. Release each container from app_0
for (RMContainer rmContainer : app_0.getLiveContainers()) {
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
RMContainerEventType.KILL);
null, RMContainerEventType.KILL);
}
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -403,7 +405,7 @@ public class TestLeafQueue {
// 9. Release each container from app_2
for (RMContainer rmContainer : app_2.getLiveContainers()) {
a.completedContainer(clusterResource, app_2, node_0, rmContainer,
RMContainerEventType.KILL);
null, RMContainerEventType.KILL);
}
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -414,7 +416,7 @@ public class TestLeafQueue {
// 10. Release each container from app_3
for (RMContainer rmContainer : app_3.getLiveContainers()) {
a.completedContainer(clusterResource, app_3, node_0, rmContainer,
RMContainerEventType.KILL);
null, RMContainerEventType.KILL);
}
assertEquals(0*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -489,7 +491,7 @@ public class TestLeafQueue {
// Now free 1 container from app_0 i.e. 1G
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -499,7 +501,7 @@ public class TestLeafQueue {
// Now finish another container from app_0 and fulfill the reservation
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
@ -582,7 +584,7 @@ public class TestLeafQueue {
// Now free 1 container from app_0 i.e. 1G, and re-reserve it
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
@ -613,7 +615,7 @@ public class TestLeafQueue {
// Now finish another container from app_0 and see the reservation cancelled
a.completedContainer(clusterResource, app_0, node_0,
app_0.getLiveContainers().iterator().next(), RMContainerEventType.KILL);
app_0.getLiveContainers().iterator().next(), null, RMContainerEventType.KILL);
a.assignContainers(clusterResource, node_0);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());

View File

@ -241,7 +241,7 @@ public class TestContainerTokenSecretManager {
allocateRequest.addAllAsks(ask);
allocateRequest.addAllReleases(release);
List<Container> allocatedContainers = scheduler.allocate(allocateRequest)
.getAMResponse().getNewContainerList();
.getAMResponse().getAllocatedContainers();
waitCounter = 0;
while ((allocatedContainers == null || allocatedContainers.size() == 0)
@ -251,7 +251,7 @@ public class TestContainerTokenSecretManager {
allocateRequest.setResponseId(allocateRequest.getResponseId() + 1);
allocatedContainers =
scheduler.allocate(allocateRequest).getAMResponse()
.getNewContainerList();
.getAllocatedContainers();
}
Assert.assertNotNull("Container is not allocted!", allocatedContainers);