YARN-7448. [API] Add SchedulingRequest to the AllocateRequest. (Panagiotis Garefalakis via asuresh)

This commit is contained in:
Arun Suresh 2017-11-17 10:42:43 -08:00
parent db928556c8
commit 69de9a1ba9
8 changed files with 190 additions and 1 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.util.Records;
@ -212,6 +214,32 @@ public abstract class AllocateRequest {
public abstract void setUpdateRequests(
List<UpdateContainerRequest> updateRequests);
/**
* Get the list of Scheduling requests being sent by the
* <code>ApplicationMaster</code>.
* @return list of {@link SchedulingRequest} being sent by the
* <code>ApplicationMaster</code>.
*/
@Public
@Unstable
public List<SchedulingRequest> getSchedulingRequests() {
return Collections.EMPTY_LIST;
}
/**
* Set the list of Scheduling requests to inform the
* <code>ResourceManager</code> about the application's resource requirements
* (potentially including allocation tags & placement constraints).
* @param schedulingRequests list of <code>SchedulingRequest</code> to update
* the <code>ResourceManager</code> about the application's resource
* requirements.
*/
@Public
@Unstable
public void setSchedulingRequests(
List<SchedulingRequest> schedulingRequests) {
}
@Public
@Unstable
public static AllocateRequestBuilder newBuilder() {
@ -313,6 +341,20 @@ public abstract class AllocateRequest {
return this;
}
/**
* Set the <code>schedulingRequests</code> of the request.
* @see AllocateRequest#setSchedulingRequests(List)
* @param schedulingRequests <code>SchedulingRequest</code> of the request
* @return {@link AllocateRequestBuilder}
*/
@Public
@Unstable
public AllocateRequestBuilder schedulingRequests(
List<SchedulingRequest> schedulingRequests) {
allocateRequest.setSchedulingRequests(schedulingRequests);
return this;
}
/**
* Return generated {@link AllocateRequest} object.
* @return {@link AllocateRequest}

View File

@ -61,4 +61,31 @@ public abstract class ResourceSizing {
@Public
@Unstable
public abstract void setResources(Resource resources);
@Override
public int hashCode() {
int result = getResources().hashCode();
result = 31 * result + getNumAllocations();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if(obj == null || getClass() != obj.getClass()) {
return false;
}
ResourceSizing that = (ResourceSizing) obj;
if(getNumAllocations() != that.getNumAllocations()) {
return false;
}
if(!getResources().equals(that.getResources())) {
return false;
}
return true;
}
}

View File

@ -49,6 +49,7 @@ public abstract class SchedulingRequest {
return SchedulingRequest.newBuilder()
.allocationRequestId(allocationRequestId).priority(priority)
.executionType(executionType).allocationTags(allocationTags)
.resourceSizing(resourceSizing)
.placementConstraintExpression(placementConstraintExpression).build();
}

View File

@ -91,6 +91,7 @@ message AllocateRequestProto {
optional int32 response_id = 4;
optional float progress = 5;
repeated UpdateContainerRequestProto update_requests = 7;
repeated SchedulingRequestProto scheduling_requests = 10;
}
message NMTokenProto {

View File

@ -29,14 +29,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.UpdateContainerRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.UpdateContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProtoOrBuilder;
@ -53,6 +56,7 @@ public class AllocateRequestPBImpl extends AllocateRequest {
private List<ResourceRequest> ask = null;
private List<ContainerId> release = null;
private List<UpdateContainerRequest> updateRequests = null;
private List<SchedulingRequest> schedulingRequests = null;
private ResourceBlacklistRequest blacklistRequest = null;
public AllocateRequestPBImpl() {
@ -101,6 +105,9 @@ public class AllocateRequestPBImpl extends AllocateRequest {
if (this.updateRequests != null) {
addUpdateRequestsToProto();
}
if (this.schedulingRequests != null) {
addSchedulingRequestsToProto();
}
if (this.blacklistRequest != null) {
builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest));
}
@ -177,6 +184,23 @@ public class AllocateRequestPBImpl extends AllocateRequest {
this.updateRequests.addAll(updateRequests);
}
@Override
public List<SchedulingRequest> getSchedulingRequests() {
initSchedulingRequests();
return this.schedulingRequests;
}
@Override
public void setSchedulingRequests(
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests == null) {
return;
}
initSchedulingRequests();
this.schedulingRequests.clear();
this.schedulingRequests.addAll(schedulingRequests);
}
@Override
public ResourceBlacklistRequest getResourceBlacklistRequest() {
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -261,6 +285,20 @@ public class AllocateRequestPBImpl extends AllocateRequest {
}
}
private void initSchedulingRequests() {
if (this.schedulingRequests != null) {
return;
}
AllocateRequestProtoOrBuilder p = viaProto ? proto : builder;
List<SchedulingRequestProto> list =
p.getSchedulingRequestsList();
this.schedulingRequests = new ArrayList<>();
for (SchedulingRequestProto c : list) {
this.schedulingRequests.add(convertFromProtoFormat(c));
}
}
private void addUpdateRequestsToProto() {
maybeInitBuilder();
builder.clearUpdateRequests();
@ -297,6 +335,41 @@ public class AllocateRequestPBImpl extends AllocateRequest {
builder.addAllUpdateRequests(iterable);
}
private void addSchedulingRequestsToProto() {
maybeInitBuilder();
builder.clearSchedulingRequests();
if (schedulingRequests == null) {
return;
}
Iterable<SchedulingRequestProto> iterable =
new Iterable<SchedulingRequestProto>() {
@Override
public Iterator<SchedulingRequestProto> iterator() {
return new Iterator<SchedulingRequestProto>() {
private Iterator<SchedulingRequest> iter =
schedulingRequests.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public SchedulingRequestProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllSchedulingRequests(iterable);
}
@Override
public List<ContainerId> getReleaseList() {
initReleases();
@ -377,6 +450,16 @@ public class AllocateRequestPBImpl extends AllocateRequest {
return ((UpdateContainerRequestPBImpl) t).getProto();
}
private SchedulingRequestPBImpl convertFromProtoFormat(
SchedulingRequestProto p) {
return new SchedulingRequestPBImpl(p);
}
private SchedulingRequestProto convertToProtoFormat(
SchedulingRequest t) {
return ((SchedulingRequestPBImpl) t).getProto();
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}

View File

@ -112,6 +112,6 @@ public class ResourceSizingPBImpl extends ResourceSizing {
}
private ResourceProto convertToProtoFormat(Resource r) {
return ((ResourcePBImpl) r).getProto();
return ProtoUtils.convertToProtoFormat(r);
}
}

View File

@ -263,4 +263,20 @@ public class SchedulingRequestPBImpl extends SchedulingRequest {
this.allocationTags = new HashSet<>();
this.allocationTags.addAll(p.getAllocationTagsList());
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
}

View File

@ -149,8 +149,10 @@ import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
import org.apache.hadoop.yarn.api.records.Token;
@ -189,7 +191,9 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceOptionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceSizingPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceTypeInfoPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.StrictPreemptionContractPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
@ -225,6 +229,8 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceOptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceSizingProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SchedulingRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.SerializedExceptionProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StrictPreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnProtos.URLProto;
@ -428,6 +434,8 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
generateByNewInstance(QueueConfigurations.class);
generateByNewInstance(CollectorInfo.class);
generateByNewInstance(ResourceTypeInfo.class);
generateByNewInstance(ResourceSizing.class);
generateByNewInstance(SchedulingRequest.class);
}
@Test
@ -906,6 +914,17 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
validatePBImplRecord(ResourceRequestPBImpl.class, ResourceRequestProto.class);
}
@Test
public void testResourceSizingPBImpl() throws Exception {
validatePBImplRecord(ResourceSizingPBImpl.class, ResourceSizingProto.class);
}
@Test
public void testSchedulingRequestPBImpl() throws Exception {
validatePBImplRecord(SchedulingRequestPBImpl.class,
SchedulingRequestProto.class);
}
@Test
public void testSerializedExceptionPBImpl() throws Exception {
validatePBImplRecord(SerializedExceptionPBImpl.class,