YARN-5127. Expose ExecutionType in Container api record. (Hitesh Sharma via asuresh)

This commit is contained in:
Arun Suresh 2016-05-27 14:06:32 -07:00
parent 5ea6fd85c7
commit aa975bc781
6 changed files with 141 additions and 60 deletions

View File

@ -66,6 +66,15 @@ public abstract class Container implements Comparable<Container> {
public static Container newInstance(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority,
containerToken, ExecutionType.GUARANTEED);
}
@Private
@Unstable
public static Container newInstance(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken, ExecutionType executionType) {
Container container = Records.newRecord(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@ -73,6 +82,7 @@ public abstract class Container implements Comparable<Container> {
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
container.setExecutionType(executionType);
return container;
}
@ -163,4 +173,20 @@ public abstract class Container implements Comparable<Container> {
@Private
@Unstable
public abstract void setContainerToken(Token containerToken);
/**
* Get the <code>ExecutionType</code> for the container.
* @return <code>ExecutionType</code> for the container.
*/
@Private
@Unstable
public abstract ExecutionType getExecutionType();
/**
* Set the <code>ExecutionType</code> for the container.
* @param executionType ExecutionType
*/
@Private
@Unstable
public abstract void setExecutionType(ExecutionType executionType);
}

View File

@ -92,6 +92,7 @@ message ContainerProto {
optional ResourceProto resource = 4;
optional PriorityProto priority = 5;
optional hadoop.common.TokenProto container_token = 6;
optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED];
}
message ContainerReportProto {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
@Private
@Unstable
@ -248,6 +250,18 @@ public class ContainerPBImpl extends Container {
this.containerToken = containerToken;
}
@Override
public ExecutionType getExecutionType() {
ContainerProtoOrBuilder p = viaProto ? proto : builder;
return convertFromProtoFormat(p.getExecutionType());
}
@Override
public void setExecutionType(ExecutionType executionType) {
maybeInitBuilder();
builder.setExecutionType(convertToProtoFormat(executionType));
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
@ -288,6 +302,15 @@ public class ContainerPBImpl extends Container {
return ((TokenPBImpl)t).getProto();
}
private ExecutionType convertFromProtoFormat(
ExecutionTypeProto e) {
return ProtoUtils.convertFromProtoFormat(e);
}
private ExecutionTypeProto convertToProtoFormat(ExecutionType e) {
return ProtoUtils.convertToProtoFormat(e);
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Container: [");
@ -297,6 +320,7 @@ public class ContainerPBImpl extends Container {
sb.append("Resource: ").append(getResource()).append(", ");
sb.append("Priority: ").append(getPriority()).append(", ");
sb.append("Token: ").append(getContainerToken()).append(", ");
sb.append("ExecutionType: ").append(getExecutionType()).append(", ");
sb.append("]");
return sb.toString();
}

View File

@ -236,7 +236,7 @@ public class BuilderUtils {
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
Token containerToken, ExecutionType executionType) {
Container container = recordFactory.newRecordInstance(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
@ -244,9 +244,17 @@ public class BuilderUtils {
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
container.setExecutionType(executionType);
return container;
}
public static Container newContainer(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken) {
return newContainer(containerId, nodeId, nodeHttpAddress, resource,
priority, containerToken, ExecutionType.GUARANTEED);
}
public static <T extends Token> T newToken(Class<T> tokenClass,
byte[] identifier, String kind, byte[] password, String service) {
T token = recordFactory.newRecordInstance(tokenClass);

View File

@ -160,7 +160,8 @@ public class OpportunisticContainerAllocator {
containerTokenIdentifier);
Container container = BuilderUtils.newContainer(
cId, nodeId, nodeId.getHost() + ":" + webpagePort,
capability, rr.getPriority(), containerToken);
capability, rr.getPriority(), containerToken,
containerTokenIdentifier.getExecutionType());
return container;
}

View File

@ -35,6 +35,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -66,6 +71,7 @@ import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
public class TestDistributedSchedulingService {
@ -92,63 +98,13 @@ public class TestDistributedSchedulingService {
return new YarnConfiguration();
}
};
DistributedSchedulingService service =
new DistributedSchedulingService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster
(RegisterApplicationMasterRequest request) throws
YarnException, IOException {
RegisterApplicationMasterResponse resp = factory.newRecordInstance(
RegisterApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setQueue("dummyQueue");
return resp;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster
(FinishApplicationMasterRequest request) throws YarnException,
IOException {
FinishApplicationMasterResponse resp = factory.newRecordInstance(
FinishApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setIsUnregistered(false);
return resp;
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
AllocateResponse response = factory.newRecordInstance
(AllocateResponse.class);
response.setNumClusterNodes(12345);
return response;
}
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling
(RegisterApplicationMasterRequest request) throws
YarnException, IOException {
DistSchedRegisterResponse resp = factory.newRecordInstance(
DistSchedRegisterResponse.class);
resp.setContainerIdStart(54321l);
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
return resp;
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;
}
};
Container c = factory.newRecordInstance(Container.class);
c.setExecutionType(ExecutionType.OPPORTUNISTIC);
c.setId(
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@ -180,6 +136,10 @@ public class TestDistributedSchedulingService {
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto())
);
List<Container> allocatedContainers = allocResp.getAllocatedContainers();
Assert.assertEquals(1, allocatedContainers.size());
Assert.assertEquals(ExecutionType.OPPORTUNISTIC,
allocatedContainers.get(0).getExecutionType());
Assert.assertEquals(12345, allocResp.getNumClusterNodes());
@ -222,4 +182,65 @@ public class TestDistributedSchedulingService {
Assert.assertEquals(
false, dsfinishResp.getIsUnregistered());
}
private DistributedSchedulingService createService(final RecordFactory
factory, final RMContext rmContext, final Container c) {
return new DistributedSchedulingService(rmContext, null) {
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
RegisterApplicationMasterResponse resp = factory.newRecordInstance(
RegisterApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setQueue("dummyQueue");
return resp;
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnException,
IOException {
FinishApplicationMasterResponse resp = factory.newRecordInstance(
FinishApplicationMasterResponse.class);
// Dummy Entry to Assert that we get this object back
resp.setIsUnregistered(false);
return resp;
}
@Override
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
AllocateResponse response = factory.newRecordInstance(
AllocateResponse.class);
response.setNumClusterNodes(12345);
response.setAllocatedContainers(Arrays.asList(c));
return response;
}
@Override
public DistSchedRegisterResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws
YarnException, IOException {
DistSchedRegisterResponse resp = factory.newRecordInstance(
DistSchedRegisterResponse.class);
resp.setContainerIdStart(54321L);
resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4));
resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1));
resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2));
return resp;
}
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
resp.setNodesForScheduling(
Arrays.asList(NodeId.newInstance("h1", 1234)));
return resp;
}
};
}
}