diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index ae0891e89af..d8d23478740 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.protocolrecords; +import java.util.Collections; import java.util.List; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.util.Records; @@ -212,6 +214,32 @@ public abstract class AllocateRequest { public abstract void setUpdateRequests( List updateRequests); + /** + * Get the list of Scheduling requests being sent by the + * ApplicationMaster. + * @return list of {@link SchedulingRequest} being sent by the + * ApplicationMaster. + */ + @Public + @Unstable + public List getSchedulingRequests() { + return Collections.EMPTY_LIST; + } + + /** + * Set the list of Scheduling requests to inform the + * ResourceManager about the application's resource requirements + * (potentially including allocation tags & placement constraints). + * @param schedulingRequests list of SchedulingRequest to update + * the ResourceManager about the application's resource + * requirements. + */ + @Public + @Unstable + public void setSchedulingRequests( + List schedulingRequests) { + } + @Public @Unstable public static AllocateRequestBuilder newBuilder() { @@ -313,6 +341,20 @@ public abstract class AllocateRequest { return this; } + /** + * Set the schedulingRequests of the request. + * @see AllocateRequest#setSchedulingRequests(List) + * @param schedulingRequests SchedulingRequest of the request + * @return {@link AllocateRequestBuilder} + */ + @Public + @Unstable + public AllocateRequestBuilder schedulingRequests( + List schedulingRequests) { + allocateRequest.setSchedulingRequests(schedulingRequests); + return this; + } + /** * Return generated {@link AllocateRequest} object. * @return {@link AllocateRequest} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java index d82be11017c..8cdc63fdc7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java @@ -61,4 +61,31 @@ public abstract class ResourceSizing { @Public @Unstable public abstract void setResources(Resource resources); + + @Override + public int hashCode() { + int result = getResources().hashCode(); + result = 31 * result + getNumAllocations(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if(obj == null || getClass() != obj.getClass()) { + return false; + } + + ResourceSizing that = (ResourceSizing) obj; + + if(getNumAllocations() != that.getNumAllocations()) { + return false; + } + if(!getResources().equals(that.getResources())) { + return false; + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java index 47a0697f878..e32dd24624c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java @@ -49,6 +49,7 @@ public abstract class SchedulingRequest { return SchedulingRequest.newBuilder() .allocationRequestId(allocationRequestId).priority(priority) .executionType(executionType).allocationTags(allocationTags) + .resourceSizing(resourceSizing) .placementConstraintExpression(placementConstraintExpression).build(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 68e585d15bc..e49c4e34d1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -91,6 +91,7 @@ message AllocateRequestProto { optional int32 response_id = 4; optional float progress = 5; repeated UpdateContainerRequestProto update_requests = 7; + repeated SchedulingRequestProto scheduling_requests = 10; } message NMTokenProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index 0f0f5710332..b460044bb7f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -29,14 +29,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder; @@ -53,6 +56,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List ask = null; private List release = null; private List updateRequests = null; + private List schedulingRequests = null; private ResourceBlacklistRequest blacklistRequest = null; public AllocateRequestPBImpl() { @@ -101,6 +105,9 @@ public class AllocateRequestPBImpl extends AllocateRequest { if (this.updateRequests != null) { addUpdateRequestsToProto(); } + if (this.schedulingRequests != null) { + addSchedulingRequestsToProto(); + } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } @@ -177,6 +184,23 @@ public class AllocateRequestPBImpl extends AllocateRequest { this.updateRequests.addAll(updateRequests); } + @Override + public List getSchedulingRequests() { + initSchedulingRequests(); + return this.schedulingRequests; + } + + @Override + public void setSchedulingRequests( + List schedulingRequests) { + if (schedulingRequests == null) { + return; + } + initSchedulingRequests(); + this.schedulingRequests.clear(); + this.schedulingRequests.addAll(schedulingRequests); + } + @Override public ResourceBlacklistRequest getResourceBlacklistRequest() { AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -261,6 +285,20 @@ public class AllocateRequestPBImpl extends AllocateRequest { } } + private void initSchedulingRequests() { + if (this.schedulingRequests != null) { + return; + } + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getSchedulingRequestsList(); + this.schedulingRequests = new ArrayList<>(); + + for (SchedulingRequestProto c : list) { + this.schedulingRequests.add(convertFromProtoFormat(c)); + } + } + private void addUpdateRequestsToProto() { maybeInitBuilder(); builder.clearUpdateRequests(); @@ -297,6 +335,41 @@ public class AllocateRequestPBImpl extends AllocateRequest { builder.addAllUpdateRequests(iterable); } + private void addSchedulingRequestsToProto() { + maybeInitBuilder(); + builder.clearSchedulingRequests(); + if (schedulingRequests == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + private Iterator iter = + schedulingRequests.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public SchedulingRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + builder.addAllSchedulingRequests(iterable); + } @Override public List getReleaseList() { initReleases(); @@ -377,6 +450,16 @@ public class AllocateRequestPBImpl extends AllocateRequest { return ((UpdateContainerRequestPBImpl) t).getProto(); } + private SchedulingRequestPBImpl convertFromProtoFormat( + SchedulingRequestProto p) { + return new SchedulingRequestPBImpl(p); + } + + private SchedulingRequestProto convertToProtoFormat( + SchedulingRequest t) { + return ((SchedulingRequestPBImpl) t).getProto(); + } + private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java index 05bb3bd8551..f98e488031b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java @@ -112,6 +112,6 @@ public class ResourceSizingPBImpl extends ResourceSizing { } private ResourceProto convertToProtoFormat(Resource r) { - return ((ResourcePBImpl) r).getProto(); + return ProtoUtils.convertToProtoFormat(r); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java index 7826b369273..305856a95d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java @@ -263,4 +263,20 @@ public class SchedulingRequestPBImpl extends SchedulingRequest { this.allocationTags = new HashSet<>(); this.allocationTags.addAll(p.getAllocationTagsList()); } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index c5585c28b21..a0b907dae1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -149,8 +149,10 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.ResourceTypeInfo; import org.apache.hadoop.yarn.api.records.ResourceUtilization; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SerializedException; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.Token; @@ -189,7 +191,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourceSizingPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl; @@ -225,6 +229,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto; import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto; import org.apache.hadoop.yarn.proto.YarnProtos.URLProto; @@ -428,6 +434,8 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { generateByNewInstance(QueueConfigurations.class); generateByNewInstance(CollectorInfo.class); generateByNewInstance(ResourceTypeInfo.class); + generateByNewInstance(ResourceSizing.class); + generateByNewInstance(SchedulingRequest.class); } @Test @@ -906,6 +914,17 @@ public class TestPBImplRecords extends BasePBImplRecordsTest { validatePBImplRecord(ResourceRequestPBImpl.class, ResourceRequestProto.class); } + @Test + public void testResourceSizingPBImpl() throws Exception { + validatePBImplRecord(ResourceSizingPBImpl.class, ResourceSizingProto.class); + } + + @Test + public void testSchedulingRequestPBImpl() throws Exception { + validatePBImplRecord(SchedulingRequestPBImpl.class, + SchedulingRequestProto.class); + } + @Test public void testSerializedExceptionPBImpl() throws Exception { validatePBImplRecord(SerializedExceptionPBImpl.class,