diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java index 38fa8b90e78..9a62935344d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Container.java @@ -66,6 +66,15 @@ public abstract class Container implements Comparable { public static Container newInstance(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, Token containerToken) { + return newInstance(containerId, nodeId, nodeHttpAddress, resource, priority, + containerToken, ExecutionType.GUARANTEED); + } + + @Private + @Unstable + public static Container newInstance(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken, ExecutionType executionType) { Container container = Records.newRecord(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -73,6 +82,7 @@ public abstract class Container implements Comparable { container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setExecutionType(executionType); return container; } @@ -163,4 +173,20 @@ public abstract class Container implements Comparable { @Private @Unstable public abstract void setContainerToken(Token containerToken); + + /** + * Get the ExecutionType for the container. + * @return ExecutionType for the container. + */ + @Private + @Unstable + public abstract ExecutionType getExecutionType(); + + /** + * Set the ExecutionType for the container. + * @param executionType ExecutionType + */ + @Private + @Unstable + public abstract void setExecutionType(ExecutionType executionType); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 60cdfd155e8..814c5bb762e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -92,6 +92,7 @@ message ContainerProto { optional ResourceProto resource = 4; optional PriorityProto priority = 5; optional hadoop.common.TokenProto container_token = 6; + optional ExecutionTypeProto execution_type = 7 [default = GUARANTEED]; } message ContainerReportProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java index 1700068fa8a..bd2d93794bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,6 +34,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; @Private @Unstable @@ -47,7 +49,7 @@ public class ContainerPBImpl extends Container { private Resource resource = null; private Priority priority = null; private Token containerToken = null; - + public ContainerPBImpl() { builder = ContainerProto.newBuilder(); } @@ -248,6 +250,18 @@ public class ContainerPBImpl extends Container { this.containerToken = containerToken; } + @Override + public ExecutionType getExecutionType() { + ContainerProtoOrBuilder p = viaProto ? proto : builder; + return convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public void setExecutionType(ExecutionType executionType) { + maybeInitBuilder(); + builder.setExecutionType(convertToProtoFormat(executionType)); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } @@ -288,6 +302,15 @@ public class ContainerPBImpl extends Container { return ((TokenPBImpl)t).getProto(); } + private ExecutionType convertFromProtoFormat( + ExecutionTypeProto e) { + return ProtoUtils.convertFromProtoFormat(e); + } + + private ExecutionTypeProto convertToProtoFormat(ExecutionType e) { + return ProtoUtils.convertToProtoFormat(e); + } + public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Container: ["); @@ -297,6 +320,7 @@ public class ContainerPBImpl extends Container { sb.append("Resource: ").append(getResource()).append(", "); sb.append("Priority: ").append(getPriority()).append(", "); sb.append("Token: ").append(getContainerToken()).append(", "); + sb.append("ExecutionType: ").append(getExecutionType()).append(", "); sb.append("]"); return sb.toString(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java index a70d143c476..b97f935aadb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java @@ -236,7 +236,7 @@ public class BuilderUtils { public static Container newContainer(ContainerId containerId, NodeId nodeId, String nodeHttpAddress, Resource resource, Priority priority, - Token containerToken) { + Token containerToken, ExecutionType executionType) { Container container = recordFactory.newRecordInstance(Container.class); container.setId(containerId); container.setNodeId(nodeId); @@ -244,9 +244,17 @@ public class BuilderUtils { container.setResource(resource); container.setPriority(priority); container.setContainerToken(containerToken); + container.setExecutionType(executionType); return container; } + public static Container newContainer(ContainerId containerId, NodeId nodeId, + String nodeHttpAddress, Resource resource, Priority priority, + Token containerToken) { + return newContainer(containerId, nodeId, nodeHttpAddress, resource, + priority, containerToken, ExecutionType.GUARANTEED); + } + public static T newToken(Class tokenClass, byte[] identifier, String kind, byte[] password, String service) { T token = recordFactory.newRecordInstance(tokenClass); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java index e33c389b732..22a6a2425a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/OpportunisticContainerAllocator.java @@ -160,7 +160,8 @@ public class OpportunisticContainerAllocator { containerTokenIdentifier); Container container = BuilderUtils.newContainer( cId, nodeId, nodeId.getHost() + ":" + webpagePort, - capability, rr.getPriority(), containerToken); + capability, rr.getPriority(), containerToken, + containerTokenIdentifier.getExecutionType()); return container; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 47563d51a3c..1982776d187 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -35,6 +35,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb .RegisterApplicationMasterRequestPBImpl; import org.apache.hadoop.yarn.api.protocolrecords.impl.pb .RegisterApplicationMasterResponsePBImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; @@ -66,6 +71,7 @@ import org.junit.Test; import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; +import java.util.List; public class TestDistributedSchedulingService { @@ -92,63 +98,13 @@ public class TestDistributedSchedulingService { return new YarnConfiguration(); } }; - DistributedSchedulingService service = - new DistributedSchedulingService(rmContext, null) { - @Override - public RegisterApplicationMasterResponse registerApplicationMaster - (RegisterApplicationMasterRequest request) throws - YarnException, IOException { - RegisterApplicationMasterResponse resp = factory.newRecordInstance( - RegisterApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setQueue("dummyQueue"); - return resp; - } - - @Override - public FinishApplicationMasterResponse finishApplicationMaster - (FinishApplicationMasterRequest request) throws YarnException, - IOException { - FinishApplicationMasterResponse resp = factory.newRecordInstance( - FinishApplicationMasterResponse.class); - // Dummy Entry to Assert that we get this object back - resp.setIsUnregistered(false); - return resp; - } - - @Override - public AllocateResponse allocate(AllocateRequest request) throws - YarnException, IOException { - AllocateResponse response = factory.newRecordInstance - (AllocateResponse.class); - response.setNumClusterNodes(12345); - return response; - } - - @Override - public DistSchedRegisterResponse - registerApplicationMasterForDistributedScheduling - (RegisterApplicationMasterRequest request) throws - YarnException, IOException { - DistSchedRegisterResponse resp = factory.newRecordInstance( - DistSchedRegisterResponse.class); - resp.setContainerIdStart(54321l); - resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4)); - resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1)); - resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2)); - return resp; - } - - @Override - public DistSchedAllocateResponse allocateForDistributedScheduling - (AllocateRequest request) throws YarnException, IOException { - DistSchedAllocateResponse resp = - factory.newRecordInstance(DistSchedAllocateResponse.class); - resp.setNodesForScheduling( - Arrays.asList(NodeId.newInstance("h1", 1234))); - return resp; - } - }; + Container c = factory.newRecordInstance(Container.class); + c.setExecutionType(ExecutionType.OPPORTUNISTIC); + c.setId( + ContainerId.newContainerId( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(12345, 1), 2), 3)); + DistributedSchedulingService service = createService(factory, rmContext, c); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -180,6 +136,10 @@ public class TestDistributedSchedulingService { ((AllocateRequestPBImpl)factory .newRecordInstance(AllocateRequest.class)).getProto()) ); + List allocatedContainers = allocResp.getAllocatedContainers(); + Assert.assertEquals(1, allocatedContainers.size()); + Assert.assertEquals(ExecutionType.OPPORTUNISTIC, + allocatedContainers.get(0).getExecutionType()); Assert.assertEquals(12345, allocResp.getNumClusterNodes()); @@ -222,4 +182,65 @@ public class TestDistributedSchedulingService { Assert.assertEquals( false, dsfinishResp.getIsUnregistered()); } + + private DistributedSchedulingService createService(final RecordFactory + factory, final RMContext rmContext, final Container c) { + return new DistributedSchedulingService(rmContext, null) { + @Override + public RegisterApplicationMasterResponse registerApplicationMaster( + RegisterApplicationMasterRequest request) throws + YarnException, IOException { + RegisterApplicationMasterResponse resp = factory.newRecordInstance( + RegisterApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setQueue("dummyQueue"); + return resp; + } + + @Override + public FinishApplicationMasterResponse finishApplicationMaster( + FinishApplicationMasterRequest request) throws YarnException, + IOException { + FinishApplicationMasterResponse resp = factory.newRecordInstance( + FinishApplicationMasterResponse.class); + // Dummy Entry to Assert that we get this object back + resp.setIsUnregistered(false); + return resp; + } + + @Override + public AllocateResponse allocate(AllocateRequest request) throws + YarnException, IOException { + AllocateResponse response = factory.newRecordInstance( + AllocateResponse.class); + response.setNumClusterNodes(12345); + response.setAllocatedContainers(Arrays.asList(c)); + return response; + } + + @Override + public DistSchedRegisterResponse + registerApplicationMasterForDistributedScheduling( + RegisterApplicationMasterRequest request) throws + YarnException, IOException { + DistSchedRegisterResponse resp = factory.newRecordInstance( + DistSchedRegisterResponse.class); + resp.setContainerIdStart(54321L); + resp.setMaxAllocatableCapabilty(Resource.newInstance(4096, 4)); + resp.setMinAllocatableCapabilty(Resource.newInstance(1024, 1)); + resp.setIncrAllocatableCapabilty(Resource.newInstance(2048, 2)); + return resp; + } + + @Override + public DistSchedAllocateResponse allocateForDistributedScheduling( + AllocateRequest request) throws YarnException, IOException { + DistSchedAllocateResponse resp = + factory.newRecordInstance(DistSchedAllocateResponse.class); + resp.setNodesForScheduling( + Arrays.asList(NodeId.newInstance("h1", 1234))); + return resp; + } + }; + } }