From b57e8bc3002a95d2f2f328554d792151cdc1120d Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Mon, 30 Oct 2017 16:54:02 -0700 Subject: [PATCH] YARN-6594. [API] Introduce SchedulingRequest object. (Konstantinos Karanasos via wangda) --- .../yarn/api/records/ResourceSizing.java | 64 +++++ .../yarn/api/records/SchedulingRequest.java | 205 ++++++++++++++ .../src/main/proto/yarn_protos.proto | 14 + .../records/impl/pb/ResourceSizingPBImpl.java | 117 ++++++++ .../impl/pb/SchedulingRequestPBImpl.java | 266 ++++++++++++++++++ 5 files changed, 666 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java new file mode 100644 index 00000000000..d82be11017c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceSizing.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code ResourceSizing} contains information for the size of a + * {@link SchedulingRequest}, such as the number of requested allocations and + * the resources for each allocation. + */ +@Public +@Unstable +public abstract class ResourceSizing { + + @Public + @Unstable + public static ResourceSizing newInstance(Resource resources) { + return ResourceSizing.newInstance(1, resources); + } + + @Public + @Unstable + public static ResourceSizing newInstance(int numAllocations, Resource resources) { + ResourceSizing resourceSizing = Records.newRecord(ResourceSizing.class); + resourceSizing.setNumAllocations(numAllocations); + resourceSizing.setResources(resources); + return resourceSizing; + } + + @Public + @Unstable + public abstract int getNumAllocations(); + + @Public + @Unstable + public abstract void setNumAllocations(int numAllocations); + + @Public + @Unstable + public abstract Resource getResources(); + + @Public + @Unstable + public abstract void setResources(Resource resources); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java new file mode 100644 index 00000000000..47a0697f878 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/SchedulingRequest.java @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.util.Records; + +/** + * {@code SchedulingRequest} represents a request made by an application to the + * {@code ResourceManager} to obtain an allocation. It is similar to the + * {@link ResourceRequest}. However, it is more complete than the latter, as it + * allows applications to specify allocation tags (e.g., to express that an + * allocation belongs to {@code Spark} or is an {@code HBase-master}), as well + * as involved {@link PlacementConstraint}s (e.g., anti-affinity between Spark + * and HBase allocations). + * + * The size specification of the allocation is in {@code ResourceSizing}. + */ +@Public +@Unstable +public abstract class SchedulingRequest { + + @Public + @Unstable + public static SchedulingRequest newInstance(long allocationRequestId, + Priority priority, ExecutionTypeRequest executionType, + Set allocationTags, ResourceSizing resourceSizing, + PlacementConstraint placementConstraintExpression) { + return SchedulingRequest.newBuilder() + .allocationRequestId(allocationRequestId).priority(priority) + .executionType(executionType).allocationTags(allocationTags) + .placementConstraintExpression(placementConstraintExpression).build(); + } + + @Public + @Unstable + public static SchedulingRequestBuilder newBuilder() { + return new SchedulingRequestBuilder(); + } + + /** + * Class to construct instances of {@link SchedulingRequest} with specific + * options. + */ + @Public + @Unstable + public static final class SchedulingRequestBuilder { + private SchedulingRequest schedulingRequest = + Records.newRecord(SchedulingRequest.class); + + private SchedulingRequestBuilder() { + schedulingRequest.setAllocationRequestId(0); + schedulingRequest.setPriority(Priority.newInstance(0)); + schedulingRequest.setExecutionType(ExecutionTypeRequest.newInstance()); + } + + /** + * Set the allocationRequestId of the request. + * + * @see SchedulingRequest#setAllocationRequestId(long) + * @param allocationRequestId allocationRequestId of the + * request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + */ + @Public + @Unstable + public SchedulingRequestBuilder allocationRequestId( + long allocationRequestId) { + schedulingRequest.setAllocationRequestId(allocationRequestId); + return this; + } + + /** + * Set the priority of the request. + * + * @param priority priority of the request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + * @see SchedulingRequest#setPriority(Priority) + */ + @Public + @Unstable + public SchedulingRequestBuilder priority(Priority priority) { + schedulingRequest.setPriority(priority); + return this; + } + + /** + * Set the executionType of the request. + * + * @see SchedulingRequest#setExecutionType(ExecutionTypeRequest) + * @param executionType executionType of the request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + */ + @Public + @Unstable + public SchedulingRequestBuilder executionType( + ExecutionTypeRequest executionType) { + schedulingRequest.setExecutionType(executionType); + return this; + } + + /** + * Set the allocationTags of the request. + * + * @see SchedulingRequest#setAllocationTags(Set) + * @param allocationTags allocationsTags of the request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + */ + @Public + @Unstable + public SchedulingRequestBuilder allocationTags(Set allocationTags) { + schedulingRequest.setAllocationTags(allocationTags); + return this; + } + + /** + * Set the executionType of the request. + * + * @see SchedulingRequest#setResourceSizing(ResourceSizing) + * @param resourceSizing resourceSizing of the request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + */ + @Public + @Unstable + public SchedulingRequestBuilder resourceSizing( + ResourceSizing resourceSizing) { + schedulingRequest.setResourceSizing(resourceSizing); + return this; + } + + /** + * Set the placementConstraintExpression of the request. + * + * @see SchedulingRequest#setPlacementConstraint( + * PlacementConstraint) + * @param placementConstraintExpression placementConstraints of + * the request + * @return {@link SchedulingRequest.SchedulingRequestBuilder} + */ + @Public + @Unstable + public SchedulingRequestBuilder placementConstraintExpression( + PlacementConstraint placementConstraintExpression) { + schedulingRequest + .setPlacementConstraint(placementConstraintExpression); + return this; + } + + /** + * Return generated {@link SchedulingRequest} object. + * + * @return {@link SchedulingRequest} + */ + @Public + @Unstable + public SchedulingRequest build() { + return schedulingRequest; + } + } + + public abstract long getAllocationRequestId(); + + public abstract void setAllocationRequestId(long allocationRequestId); + + public abstract Priority getPriority(); + + public abstract void setPriority(Priority priority); + + public abstract ExecutionTypeRequest getExecutionType(); + + public abstract void setExecutionType(ExecutionTypeRequest executionType); + + public abstract Set getAllocationTags(); + + public abstract void setAllocationTags(Set allocationTags); + + public abstract ResourceSizing getResourceSizing(); + + public abstract void setResourceSizing(ResourceSizing resourceSizing); + + public abstract PlacementConstraint getPlacementConstraint(); + + public abstract void setPlacementConstraint( + PlacementConstraint placementConstraint); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index ff0d54bb91a..d24f8639ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -405,6 +405,20 @@ message ExecutionTypeRequestProto { optional bool enforce_execution_type = 2 [default = false]; } +message SchedulingRequestProto { + optional int64 allocationRequestId = 1 [default = 0]; + optional PriorityProto priority = 2; + optional ExecutionTypeRequestProto executionType = 3; + repeated string allocationTags = 4; + optional ResourceSizingProto resourceSizing = 5; + optional PlacementConstraintProto placementConstraint = 6; +} + +message ResourceSizingProto { + optional int32 numAllocations = 1; + optional ResourceProto resources = 2; +} + enum AMCommandProto { AM_RESYNC = 1; AM_SHUTDOWN = 2; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java new file mode 100644 index 00000000000..05bb3bd8551 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceSizingPBImpl.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProtoOrBuilder; + +@Private +@Unstable +public class ResourceSizingPBImpl extends ResourceSizing { + ResourceSizingProto proto = ResourceSizingProto.getDefaultInstance(); + ResourceSizingProto.Builder builder = null; + boolean viaProto = false; + + private Resource resources = null; + + public ResourceSizingPBImpl() { + builder = ResourceSizingProto.newBuilder(); + } + + public ResourceSizingPBImpl(ResourceSizingProto proto) { + this.proto = proto; + viaProto = true; + } + + public ResourceSizingProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.resources != null) { + builder.setResources(convertToProtoFormat(this.resources)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ResourceSizingProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public int getNumAllocations() { + ResourceSizingProtoOrBuilder p = viaProto ? proto : builder; + return (p.getNumAllocations()); + } + + @Override + public void setNumAllocations(int numAllocations) { + maybeInitBuilder(); + builder.setNumAllocations(numAllocations); + } + + @Override + public Resource getResources() { + ResourceSizingProtoOrBuilder p = viaProto ? proto : builder; + if (this.resources != null) { + return this.resources; + } + if (!p.hasResources()) { + return null; + } + this.resources = convertFromProtoFormat(p.getResources()); + return this.resources; + } + + @Override + public void setResources(Resource resources) { + maybeInitBuilder(); + if (resources == null) { + builder.clearResources(); + } + this.resources = resources; + } + + private ResourcePBImpl convertFromProtoFormat(ResourceProto r) { + return new ResourcePBImpl(r); + } + + private ResourceProto convertToProtoFormat(Resource r) { + return ((ResourcePBImpl) r).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java new file mode 100644 index 00000000000..7826b369273 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/SchedulingRequestPBImpl.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.pb.PlacementConstraintFromProtoConverter; +import org.apache.hadoop.yarn.api.pb.PlacementConstraintToProtoConverter; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto; +import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProtoOrBuilder; + +@Private +@Unstable +public class SchedulingRequestPBImpl extends SchedulingRequest { + SchedulingRequestProto proto = SchedulingRequestProto.getDefaultInstance(); + SchedulingRequestProto.Builder builder = null; + boolean viaProto = false; + + private Priority priority = null; + private ExecutionTypeRequest executionType = null; + private Set allocationTags = null; + private ResourceSizing resourceSizing = null; + private PlacementConstraint placementConstraint = null; + + public SchedulingRequestPBImpl() { + builder = SchedulingRequestProto.newBuilder(); + } + + public SchedulingRequestPBImpl(SchedulingRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SchedulingRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + private void mergeLocalToBuilder() { + if (this.priority != null) { + builder.setPriority(convertToProtoFormat(this.priority)); + } + if (this.executionType != null) { + builder.setExecutionType(convertToProtoFormat(this.executionType)); + } + if (this.allocationTags != null) { + builder.clearAllocationTags(); + builder.addAllAllocationTags(this.allocationTags); + } + if (this.resourceSizing != null) { + builder.setResourceSizing(convertToProtoFormat(this.resourceSizing)); + } + if (this.placementConstraint != null) { + builder.setPlacementConstraint( + convertToProtoFormat(this.placementConstraint)); + } + } + + private void mergeLocalToProto() { + if (viaProto) { + maybeInitBuilder(); + } + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SchedulingRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getAllocationRequestId() { + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + return (p.getAllocationRequestId()); + } + + @Override + public void setAllocationRequestId(long allocationRequestId) { + maybeInitBuilder(); + builder.setAllocationRequestId(allocationRequestId); + } + + @Override + public Priority getPriority() { + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.priority != null) { + return this.priority; + } + if (!p.hasPriority()) { + return null; + } + this.priority = convertFromProtoFormat(p.getPriority()); + return this.priority; + } + + @Override + public void setPriority(Priority priority) { + maybeInitBuilder(); + if (priority == null) { + builder.clearPriority(); + } + this.priority = priority; + } + + @Override + public ExecutionTypeRequest getExecutionType() { + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.executionType != null) { + return this.executionType; + } + if (!p.hasExecutionType()) { + return null; + } + this.executionType = convertFromProtoFormat(p.getExecutionType()); + return this.executionType; + } + + @Override + public void setExecutionType(ExecutionTypeRequest executionType) { + maybeInitBuilder(); + if (executionType == null) { + builder.clearExecutionType(); + } + this.executionType = executionType; + } + + @Override + public Set getAllocationTags() { + initAllocationTags(); + return this.allocationTags; + } + + @Override + public void setAllocationTags(Set allocationTags) { + maybeInitBuilder(); + builder.clearAllocationTags(); + this.allocationTags = allocationTags; + } + + @Override + public ResourceSizing getResourceSizing() { + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.resourceSizing != null) { + return this.resourceSizing; + } + if (!p.hasResourceSizing()) { + return null; + } + this.resourceSizing = convertFromProtoFormat(p.getResourceSizing()); + return this.resourceSizing; + } + + @Override + public void setResourceSizing(ResourceSizing resourceSizing) { + maybeInitBuilder(); + if (resourceSizing == null) { + builder.clearResourceSizing(); + } + this.resourceSizing = resourceSizing; + } + + @Override + public PlacementConstraint getPlacementConstraint() { + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.placementConstraint != null) { + return this.placementConstraint; + } + if (!p.hasPlacementConstraint()) { + return null; + } + this.placementConstraint = + convertFromProtoFormat(p.getPlacementConstraint()); + return this.placementConstraint; + } + + @Override + public void setPlacementConstraint(PlacementConstraint placementConstraint) { + maybeInitBuilder(); + if (placementConstraint == null) { + builder.clearPlacementConstraint(); + } + this.placementConstraint = placementConstraint; + } + + private PriorityPBImpl convertFromProtoFormat(PriorityProto p) { + return new PriorityPBImpl(p); + } + + private PriorityProto convertToProtoFormat(Priority p) { + return ((PriorityPBImpl) p).getProto(); + } + + private ExecutionTypeRequestPBImpl convertFromProtoFormat( + ExecutionTypeRequestProto p) { + return new ExecutionTypeRequestPBImpl(p); + } + + private ExecutionTypeRequestProto convertToProtoFormat( + ExecutionTypeRequest p) { + return ((ExecutionTypeRequestPBImpl) p).getProto(); + } + + private ResourceSizingPBImpl convertFromProtoFormat(ResourceSizingProto p) { + return new ResourceSizingPBImpl(p); + } + + private ResourceSizingProto convertToProtoFormat(ResourceSizing p) { + return ((ResourceSizingPBImpl) p).getProto(); + } + + private PlacementConstraint convertFromProtoFormat( + PlacementConstraintProto c) { + PlacementConstraintFromProtoConverter fromProtoConverter = + new PlacementConstraintFromProtoConverter(c); + return fromProtoConverter.convert(); + } + + private PlacementConstraintProto convertToProtoFormat(PlacementConstraint c) { + PlacementConstraintToProtoConverter toProtoConverter = + new PlacementConstraintToProtoConverter(c); + return toProtoConverter.convert(); + } + + private void initAllocationTags() { + if (this.allocationTags != null) { + return; + } + SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder; + this.allocationTags = new HashSet<>(); + this.allocationTags.addAll(p.getAllocationTagsList()); + } +}