YARN-6594. [API] Introduce SchedulingRequest object. (Konstantinos Karanasos via wangda)

This commit is contained in:
Wangda Tan 2017-10-30 16:54:02 -07:00 committed by Arun Suresh
parent 33a796d9b7
commit b57e8bc300
5 changed files with 666 additions and 0 deletions

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* {@code ResourceSizing} contains information for the size of a
* {@link SchedulingRequest}, such as the number of requested allocations and
* the resources for each allocation.
*/
@Public
@Unstable
public abstract class ResourceSizing {
@Public
@Unstable
public static ResourceSizing newInstance(Resource resources) {
return ResourceSizing.newInstance(1, resources);
}
@Public
@Unstable
public static ResourceSizing newInstance(int numAllocations, Resource resources) {
ResourceSizing resourceSizing = Records.newRecord(ResourceSizing.class);
resourceSizing.setNumAllocations(numAllocations);
resourceSizing.setResources(resources);
return resourceSizing;
}
@Public
@Unstable
public abstract int getNumAllocations();
@Public
@Unstable
public abstract void setNumAllocations(int numAllocations);
@Public
@Unstable
public abstract Resource getResources();
@Public
@Unstable
public abstract void setResources(Resource resources);
}

View File

@ -0,0 +1,205 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.util.Records;
/**
* {@code SchedulingRequest} represents a request made by an application to the
* {@code ResourceManager} to obtain an allocation. It is similar to the
* {@link ResourceRequest}. However, it is more complete than the latter, as it
* allows applications to specify allocation tags (e.g., to express that an
* allocation belongs to {@code Spark} or is an {@code HBase-master}), as well
* as involved {@link PlacementConstraint}s (e.g., anti-affinity between Spark
* and HBase allocations).
*
* The size specification of the allocation is in {@code ResourceSizing}.
*/
@Public
@Unstable
public abstract class SchedulingRequest {
@Public
@Unstable
public static SchedulingRequest newInstance(long allocationRequestId,
Priority priority, ExecutionTypeRequest executionType,
Set<String> allocationTags, ResourceSizing resourceSizing,
PlacementConstraint placementConstraintExpression) {
return SchedulingRequest.newBuilder()
.allocationRequestId(allocationRequestId).priority(priority)
.executionType(executionType).allocationTags(allocationTags)
.placementConstraintExpression(placementConstraintExpression).build();
}
@Public
@Unstable
public static SchedulingRequestBuilder newBuilder() {
return new SchedulingRequestBuilder();
}
/**
* Class to construct instances of {@link SchedulingRequest} with specific
* options.
*/
@Public
@Unstable
public static final class SchedulingRequestBuilder {
private SchedulingRequest schedulingRequest =
Records.newRecord(SchedulingRequest.class);
private SchedulingRequestBuilder() {
schedulingRequest.setAllocationRequestId(0);
schedulingRequest.setPriority(Priority.newInstance(0));
schedulingRequest.setExecutionType(ExecutionTypeRequest.newInstance());
}
/**
* Set the <code>allocationRequestId</code> of the request.
*
* @see SchedulingRequest#setAllocationRequestId(long)
* @param allocationRequestId <code>allocationRequestId</code> of the
* request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
*/
@Public
@Unstable
public SchedulingRequestBuilder allocationRequestId(
long allocationRequestId) {
schedulingRequest.setAllocationRequestId(allocationRequestId);
return this;
}
/**
* Set the <code>priority</code> of the request.
*
* @param priority <code>priority</code> of the request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
* @see SchedulingRequest#setPriority(Priority)
*/
@Public
@Unstable
public SchedulingRequestBuilder priority(Priority priority) {
schedulingRequest.setPriority(priority);
return this;
}
/**
* Set the <code>executionType</code> of the request.
*
* @see SchedulingRequest#setExecutionType(ExecutionTypeRequest)
* @param executionType <code>executionType</code> of the request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
*/
@Public
@Unstable
public SchedulingRequestBuilder executionType(
ExecutionTypeRequest executionType) {
schedulingRequest.setExecutionType(executionType);
return this;
}
/**
* Set the <code>allocationTags</code> of the request.
*
* @see SchedulingRequest#setAllocationTags(Set)
* @param allocationTags <code>allocationsTags</code> of the request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
*/
@Public
@Unstable
public SchedulingRequestBuilder allocationTags(Set<String> allocationTags) {
schedulingRequest.setAllocationTags(allocationTags);
return this;
}
/**
* Set the <code>executionType</code> of the request.
*
* @see SchedulingRequest#setResourceSizing(ResourceSizing)
* @param resourceSizing <code>resourceSizing</code> of the request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
*/
@Public
@Unstable
public SchedulingRequestBuilder resourceSizing(
ResourceSizing resourceSizing) {
schedulingRequest.setResourceSizing(resourceSizing);
return this;
}
/**
* Set the <code>placementConstraintExpression</code> of the request.
*
* @see SchedulingRequest#setPlacementConstraint(
* PlacementConstraint)
* @param placementConstraintExpression <code>placementConstraints</code> of
* the request
* @return {@link SchedulingRequest.SchedulingRequestBuilder}
*/
@Public
@Unstable
public SchedulingRequestBuilder placementConstraintExpression(
PlacementConstraint placementConstraintExpression) {
schedulingRequest
.setPlacementConstraint(placementConstraintExpression);
return this;
}
/**
* Return generated {@link SchedulingRequest} object.
*
* @return {@link SchedulingRequest}
*/
@Public
@Unstable
public SchedulingRequest build() {
return schedulingRequest;
}
}
public abstract long getAllocationRequestId();
public abstract void setAllocationRequestId(long allocationRequestId);
public abstract Priority getPriority();
public abstract void setPriority(Priority priority);
public abstract ExecutionTypeRequest getExecutionType();
public abstract void setExecutionType(ExecutionTypeRequest executionType);
public abstract Set<String> getAllocationTags();
public abstract void setAllocationTags(Set<String> allocationTags);
public abstract ResourceSizing getResourceSizing();
public abstract void setResourceSizing(ResourceSizing resourceSizing);
public abstract PlacementConstraint getPlacementConstraint();
public abstract void setPlacementConstraint(
PlacementConstraint placementConstraint);
}

View File

@ -405,6 +405,20 @@ message ExecutionTypeRequestProto {
optional bool enforce_execution_type = 2 [default = false];
}
message SchedulingRequestProto {
optional int64 allocationRequestId = 1 [default = 0];
optional PriorityProto priority = 2;
optional ExecutionTypeRequestProto executionType = 3;
repeated string allocationTags = 4;
optional ResourceSizingProto resourceSizing = 5;
optional PlacementConstraintProto placementConstraint = 6;
}
message ResourceSizingProto {
optional int32 numAllocations = 1;
optional ResourceProto resources = 2;
}
enum AMCommandProto {
AM_RESYNC = 1;
AM_SHUTDOWN = 2;

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProtoOrBuilder;
@Private
@Unstable
public class ResourceSizingPBImpl extends ResourceSizing {
ResourceSizingProto proto = ResourceSizingProto.getDefaultInstance();
ResourceSizingProto.Builder builder = null;
boolean viaProto = false;
private Resource resources = null;
public ResourceSizingPBImpl() {
builder = ResourceSizingProto.newBuilder();
}
public ResourceSizingPBImpl(ResourceSizingProto proto) {
this.proto = proto;
viaProto = true;
}
public ResourceSizingProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.resources != null) {
builder.setResources(convertToProtoFormat(this.resources));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ResourceSizingProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getNumAllocations() {
ResourceSizingProtoOrBuilder p = viaProto ? proto : builder;
return (p.getNumAllocations());
}
@Override
public void setNumAllocations(int numAllocations) {
maybeInitBuilder();
builder.setNumAllocations(numAllocations);
}
@Override
public Resource getResources() {
ResourceSizingProtoOrBuilder p = viaProto ? proto : builder;
if (this.resources != null) {
return this.resources;
}
if (!p.hasResources()) {
return null;
}
this.resources = convertFromProtoFormat(p.getResources());
return this.resources;
}
@Override
public void setResources(Resource resources) {
maybeInitBuilder();
if (resources == null) {
builder.clearResources();
}
this.resources = resources;
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto r) {
return new ResourcePBImpl(r);
}
private ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
}
}

View File

@ -0,0 +1,266 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.pb.PlacementConstraintFromProtoConverter;
import org.apache.hadoop.yarn.api.pb.PlacementConstraintToProtoConverter;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto;
import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProtoOrBuilder;
@Private
@Unstable
public class SchedulingRequestPBImpl extends SchedulingRequest {
SchedulingRequestProto proto = SchedulingRequestProto.getDefaultInstance();
SchedulingRequestProto.Builder builder = null;
boolean viaProto = false;
private Priority priority = null;
private ExecutionTypeRequest executionType = null;
private Set<String> allocationTags = null;
private ResourceSizing resourceSizing = null;
private PlacementConstraint placementConstraint = null;
public SchedulingRequestPBImpl() {
builder = SchedulingRequestProto.newBuilder();
}
public SchedulingRequestPBImpl(SchedulingRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public SchedulingRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.priority != null) {
builder.setPriority(convertToProtoFormat(this.priority));
}
if (this.executionType != null) {
builder.setExecutionType(convertToProtoFormat(this.executionType));
}
if (this.allocationTags != null) {
builder.clearAllocationTags();
builder.addAllAllocationTags(this.allocationTags);
}
if (this.resourceSizing != null) {
builder.setResourceSizing(convertToProtoFormat(this.resourceSizing));
}
if (this.placementConstraint != null) {
builder.setPlacementConstraint(
convertToProtoFormat(this.placementConstraint));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = SchedulingRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public long getAllocationRequestId() {
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
return (p.getAllocationRequestId());
}
@Override
public void setAllocationRequestId(long allocationRequestId) {
maybeInitBuilder();
builder.setAllocationRequestId(allocationRequestId);
}
@Override
public Priority getPriority() {
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.priority != null) {
return this.priority;
}
if (!p.hasPriority()) {
return null;
}
this.priority = convertFromProtoFormat(p.getPriority());
return this.priority;
}
@Override
public void setPriority(Priority priority) {
maybeInitBuilder();
if (priority == null) {
builder.clearPriority();
}
this.priority = priority;
}
@Override
public ExecutionTypeRequest getExecutionType() {
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.executionType != null) {
return this.executionType;
}
if (!p.hasExecutionType()) {
return null;
}
this.executionType = convertFromProtoFormat(p.getExecutionType());
return this.executionType;
}
@Override
public void setExecutionType(ExecutionTypeRequest executionType) {
maybeInitBuilder();
if (executionType == null) {
builder.clearExecutionType();
}
this.executionType = executionType;
}
@Override
public Set<String> getAllocationTags() {
initAllocationTags();
return this.allocationTags;
}
@Override
public void setAllocationTags(Set<String> allocationTags) {
maybeInitBuilder();
builder.clearAllocationTags();
this.allocationTags = allocationTags;
}
@Override
public ResourceSizing getResourceSizing() {
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.resourceSizing != null) {
return this.resourceSizing;
}
if (!p.hasResourceSizing()) {
return null;
}
this.resourceSizing = convertFromProtoFormat(p.getResourceSizing());
return this.resourceSizing;
}
@Override
public void setResourceSizing(ResourceSizing resourceSizing) {
maybeInitBuilder();
if (resourceSizing == null) {
builder.clearResourceSizing();
}
this.resourceSizing = resourceSizing;
}
@Override
public PlacementConstraint getPlacementConstraint() {
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.placementConstraint != null) {
return this.placementConstraint;
}
if (!p.hasPlacementConstraint()) {
return null;
}
this.placementConstraint =
convertFromProtoFormat(p.getPlacementConstraint());
return this.placementConstraint;
}
@Override
public void setPlacementConstraint(PlacementConstraint placementConstraint) {
maybeInitBuilder();
if (placementConstraint == null) {
builder.clearPlacementConstraint();
}
this.placementConstraint = placementConstraint;
}
private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
return new PriorityPBImpl(p);
}
private PriorityProto convertToProtoFormat(Priority p) {
return ((PriorityPBImpl) p).getProto();
}
private ExecutionTypeRequestPBImpl convertFromProtoFormat(
ExecutionTypeRequestProto p) {
return new ExecutionTypeRequestPBImpl(p);
}
private ExecutionTypeRequestProto convertToProtoFormat(
ExecutionTypeRequest p) {
return ((ExecutionTypeRequestPBImpl) p).getProto();
}
private ResourceSizingPBImpl convertFromProtoFormat(ResourceSizingProto p) {
return new ResourceSizingPBImpl(p);
}
private ResourceSizingProto convertToProtoFormat(ResourceSizing p) {
return ((ResourceSizingPBImpl) p).getProto();
}
private PlacementConstraint convertFromProtoFormat(
PlacementConstraintProto c) {
PlacementConstraintFromProtoConverter fromProtoConverter =
new PlacementConstraintFromProtoConverter(c);
return fromProtoConverter.convert();
}
private PlacementConstraintProto convertToProtoFormat(PlacementConstraint c) {
PlacementConstraintToProtoConverter toProtoConverter =
new PlacementConstraintToProtoConverter(c);
return toProtoConverter.convert();
}
private void initAllocationTags() {
if (this.allocationTags != null) {
return;
}
SchedulingRequestProtoOrBuilder p = viaProto ? proto : builder;
this.allocationTags = new HashSet<>();
this.allocationTags.addAll(p.getAllocationTagsList());
}
}