MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on priority. Contributed by Sharad Agarwal and Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1175859 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-26 13:25:27 +00:00
parent c9a7d3dbf9
commit 1e6dfa7472
11 changed files with 102 additions and 60 deletions

View File

@ -1426,6 +1426,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3090. Fix MR AM to use ApplicationAttemptId rather than
(ApplicationId, startCount) consistently. (acmurthy)
MAPREDUCE-2646. Fixed AMRMProtocol to return containers based on
priority. (Sharad Agarwal and Arun C Murthy via vinodkv)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -586,37 +586,21 @@ public class RMContainerAllocator extends RMContainerRequestor
private ContainerRequest assign(Container allocated) {
ContainerRequest assigned = null;
if (mapResourceReqt != reduceResourceReqt) {
//assign based on size
LOG.info("Assigning based on container size");
if (allocated.getResource().getMemory() == mapResourceReqt) {
assigned = assignToFailedMap(allocated);
if (assigned == null) {
assigned = assignToMap(allocated);
}
} else if (allocated.getResource().getMemory() == reduceResourceReqt) {
assigned = assignToReduce(allocated);
}
return assigned;
}
//container can be given to either map or reduce
//assign based on priority
//try to assign to earlierFailedMaps if present
assigned = assignToFailedMap(allocated);
//Assign to reduces before assigning to maps ?
if (assigned == null) {
Priority priority = allocated.getPriority();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
LOG.info("Assigning container " + allocated + " to fast fail map");
assigned = assignToFailedMap(allocated);
} else if (PRIORITY_REDUCE.equals(priority)) {
LOG.info("Assigning container " + allocated + " to reduce");
assigned = assignToReduce(allocated);
}
//try to assign to maps if present
if (assigned == null) {
} else if (PRIORITY_MAP.equals(priority)) {
LOG.info("Assigning container " + allocated + " to map");
assigned = assignToMap(allocated);
} else {
LOG.warn("Container allocated at unwanted priority: " + priority +
". Returning to RM...");
}
return assigned;
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.ContainerManager;
* </li>
* <li>HTTP uri of the node.</li>
* <li>{@link Resource} allocated to the container.</li>
* <li>{@link Priority} at which the container was allocated.</li>
* <li>{@link ContainerState} of the container.</li>
* <li>
* {@link ContainerToken} of the container, used to securely verify
@ -111,6 +112,18 @@ public interface Container extends Comparable<Container> {
@Private
@Unstable
void setResource(Resource resource);
/**
* Get the <code>Priority</code> at which the <code>Container</code> was
* allocated.
* @return <code>Priority</code> at which the <code>Container</code> was
* allocated
*/
Priority getPriority();
@Private
@Unstable
void setPriority(Priority priority);
/**
* Get the current <code>ContainerState</code> of the container.

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTokenProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.util.ProtoUtils;
@ -48,6 +50,7 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
private ContainerId containerId = null;
private NodeId nodeId = null;
private Resource resource = null;
private Priority priority = null;
private ContainerToken containerToken = null;
private ContainerStatus containerStatus = null;
@ -84,6 +87,11 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
builder.getResource())) {
builder.setResource(convertToProtoFormat(this.resource));
}
if (this.priority != null &&
!((PriorityPBImpl) this.priority).getProto().equals(
builder.getPriority())) {
builder.setPriority(convertToProtoFormat(this.priority));
}
if (this.containerToken != null
&& !((ContainerTokenPBImpl) this.containerToken).getProto().equals(
builder.getContainerToken())) {
@ -211,6 +219,29 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
builder.clearResource();
this.resource = resource;
}
@Override
public Priority getPriority() {
ContainerProtoOrBuilder p = viaProto ? proto : builder;
if (this.priority != null) {
return this.priority;
}
if (!p.hasPriority()) {
return null;
}
this.priority = convertFromProtoFormat(p.getPriority());
return this.priority;
}
@Override
public void setPriority(Priority priority) {
maybeInitBuilder();
if (priority == null) {
builder.clearPriority();
}
this.priority = priority;
}
@Override
public ContainerToken getContainerToken() {
ContainerProtoOrBuilder p = viaProto ? proto : builder;
@ -285,6 +316,14 @@ public class ContainerPBImpl extends ProtoBase<ContainerProto> implements Contai
return ((ResourcePBImpl)t).getProto();
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority p) {
return ((PriorityPBImpl)p).getProto();
}
private ContainerTokenPBImpl convertFromProtoFormat(ContainerTokenProto p) {
return new ContainerTokenPBImpl(p);
}

View File

@ -48,6 +48,10 @@ message ResourceProto {
optional int32 memory = 1;
}
message PriorityProto {
optional int32 priority = 1;
}
enum ContainerStateProto {
C_NEW = 1;
C_RUNNING = 2;
@ -66,9 +70,10 @@ message ContainerProto {
optional NodeIdProto nodeId = 2;
optional string node_http_address = 3;
optional ResourceProto resource = 4;
optional ContainerStateProto state = 5;
optional ContainerTokenProto container_token = 6;
optional ContainerStatusProto container_status = 7;
optional PriorityProto priority = 5;
optional ContainerStateProto state = 6;
optional ContainerTokenProto container_token = 7;
optional ContainerStatusProto container_status = 8;
}
enum ApplicationStateProto {
@ -253,10 +258,6 @@ message ContainerStatusProto {
////////////////////////////////////////////////////////////////////////
////// From common//////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
message PriorityProto {
optional int32 priority = 1;
}
message StringURLMapProto {
optional string key = 1;
optional URLProto value = 2;

View File

@ -184,32 +184,24 @@ public class BuilderUtils {
return id;
}
public static Container clone(Container c) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(c.getId());
container.setContainerToken(c.getContainerToken());
container.setNodeId(c.getNodeId());
container.setNodeHttpAddress(c.getNodeHttpAddress());
container.setResource(c.getResource());
container.setState(c.getState());
return container;
}
public static Container newContainer(RecordFactory recordFactory,
ApplicationAttemptId appAttemptId, int containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource) {
String nodeHttpAddress, Resource resource, Priority priority) {
ContainerId containerID =
newContainerId(recordFactory, appAttemptId, containerId);
return newContainer(containerID, nodeId, nodeHttpAddress, resource);
return newContainer(containerID, nodeId, nodeHttpAddress,
resource, priority);
}
public static Container newContainer(ContainerId containerId,
NodeId nodeId, String nodeHttpAddress, Resource resource) {
NodeId nodeId, String nodeHttpAddress,
Resource resource, Priority priority) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
container.setNodeHttpAddress(nodeHttpAddress);
container.setResource(resource);
container.setPriority(priority);
container.setState(ContainerState.NEW);
ContainerStatus containerStatus = Records.newRecord(ContainerStatus.class);
containerStatus.setContainerId(containerId);

View File

@ -1046,19 +1046,20 @@ public class LeafQueue implements CSQueue {
}
private Container getContainer(RMContainer rmContainer,
SchedulerApp application, SchedulerNode node, Resource capability) {
SchedulerApp application, SchedulerNode node,
Resource capability, Priority priority) {
return (rmContainer != null) ? rmContainer.getContainer() :
createContainer(application, node, capability);
createContainer(application, node, capability, priority);
}
public Container createContainer(SchedulerApp application, SchedulerNode node,
Resource capability) {
Resource capability, Priority priority) {
Container container =
BuilderUtils.newContainer(this.recordFactory,
application.getApplicationAttemptId(),
application.getNewContainerId(),
node.getNodeID(),
node.getHttpAddress(), capability);
node.getNodeID(), node.getHttpAddress(),
capability, priority);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {
@ -1099,7 +1100,7 @@ public class LeafQueue implements CSQueue {
// Create the container if necessary
Container container =
getContainer(rmContainer, application, node, capability);
getContainer(rmContainer, application, node, capability, priority);
// Can we allocate a container on this node?
int availableContainers =

View File

@ -528,7 +528,8 @@ public class FifoScheduler implements ResourceScheduler {
application.getApplicationAttemptId(),
application.getNewContainerId(),
node.getRMNode().getNodeID(),
node.getRMNode().getHttpAddress(), capability);
node.getRMNode().getHttpAddress(),
capability, priority);
// If security is enabled, send the container-tokens too.
if (UserGroupInformation.isSecurityEnabled()) {

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -184,7 +185,9 @@ public class NodeManager implements ContainerManager {
Container container =
BuilderUtils.newContainer(containerLaunchContext.getContainerId(),
this.nodeId, nodeHttpAddress,
containerLaunchContext.getResource());
containerLaunchContext.getResource(),
null // DKDC - Doesn't matter
);
applicationContainers.add(container);

View File

@ -135,7 +135,8 @@ public class TestLeafQueue {
Container container = TestUtils.getMockContainer(
containerId,
((SchedulerNode)(invocation.getArguments()[1])).getNodeID(),
(Resource)(invocation.getArguments()[2]));
(Resource)(invocation.getArguments()[2]),
((Priority)invocation.getArguments()[3]));
return container;
}
}
@ -143,7 +144,9 @@ public class TestLeafQueue {
when(queue).createContainer(
any(SchedulerApp.class),
any(SchedulerNode.class),
any(Resource.class));
any(Resource.class),
any(Priority.class)
);
// 2. Stub out LeafQueue.parent.completedContainer
CSQueue parent = queue.getParent();

View File

@ -161,11 +161,13 @@ public class TestUtils {
}
public static Container getMockContainer(
ContainerId containerId, NodeId nodeId, Resource resource) {
ContainerId containerId, NodeId nodeId,
Resource resource, Priority priority) {
Container container = mock(Container.class);
when(container.getId()).thenReturn(containerId);
when(container.getNodeId()).thenReturn(nodeId);
when(container.getResource()).thenReturn(resource);
when(container.getPriority()).thenReturn(priority);
return container;
}
}