YARN-5180. Allow ResourceRequest to specify an enforceExecutionType flag. (asuresh)

This commit is contained in:
Arun Suresh 2016-06-02 05:18:01 -07:00
parent aadb77e412
commit dc26601d8f
12 changed files with 323 additions and 48 deletions

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
@ -462,7 +463,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
remoteRequest.setNodeLabelExpression(nodeLabelExpression);
remoteRequest.setExecutionType(executionType);
remoteRequest.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(executionType, true));
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.util.Records;
/**
* An object of this class represents a specification of the execution
* guarantee of the Containers associated with a ResourceRequest. It consists
* of an <code>ExecutionType</code> as well as flag that explicitly asks the
* configuredScheduler to return Containers of exactly the Execution Type
* requested.
*/
@Public
@Evolving
public abstract class ExecutionTypeRequest {
@Public
@Evolving
public static ExecutionTypeRequest newInstance() {
return newInstance(ExecutionType.GUARANTEED, false);
}
@Public
@Evolving
public static ExecutionTypeRequest newInstance(ExecutionType execType,
boolean ensureExecutionType) {
ExecutionTypeRequest executionTypeRequest =
Records.newRecord(ExecutionTypeRequest.class);
executionTypeRequest.setExecutionType(execType);
executionTypeRequest.setEnforceExecutionType(ensureExecutionType);
return executionTypeRequest;
}
/**
* Set the <code>ExecutionType</code> of the requested container.
*
* @param execType
* ExecutionType of the requested container
*/
@Public
public abstract void setExecutionType(ExecutionType execType);
/**
* Get <code>ExecutionType</code>.
*
* @return <code>ExecutionType</code>.
*/
@Public
public abstract ExecutionType getExecutionType();
/**
* Set to true to explicitly ask that the Scheduling Authority return
* Containers of exactly the Execution Type requested.
* @param enforceExecutionType whether ExecutionType request should be
* strictly honored.
*/
@Public
public abstract void setEnforceExecutionType(boolean enforceExecutionType);
/**
* Get whether Scheduling Authority should return Containers of exactly the
* Execution Type requested for this <code>ResourceRequest</code>.
* Defaults to false.
* @return whether ExecutionType request should be strictly honored
*/
@Public
public abstract boolean getEnforceExecutionType();
@Override
public int hashCode() {
final int prime = 2153;
int result = 2459;
ExecutionType executionType = getExecutionType();
boolean ensureExecutionType = getEnforceExecutionType();
result = prime * result + ((executionType == null) ? 0 :
executionType.hashCode());
result = prime * result + (ensureExecutionType ? 0 : 1);
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
ExecutionTypeRequest other = (ExecutionTypeRequest) obj;
ExecutionType executionType = getExecutionType();
if (executionType == null) {
if (other.getExecutionType() != null) {
return false;
}
} else if (executionType != other.getExecutionType()) {
return false;
}
boolean enforceExecutionType = getEnforceExecutionType();
return enforceExecutionType == other.getEnforceExecutionType();
}
}

View File

@ -80,14 +80,14 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
Resource capability, int numContainers, boolean relaxLocality,
String labelExpression) {
return newInstance(priority, hostName, capability, numContainers,
relaxLocality, labelExpression, ExecutionType.GUARANTEED);
relaxLocality, labelExpression, ExecutionTypeRequest.newInstance());
}
@Public
@Stable
@Evolving
public static ResourceRequest newInstance(Priority priority, String hostName,
Resource capability, int numContainers, boolean relaxLocality, String
labelExpression, ExecutionType execType) {
labelExpression, ExecutionTypeRequest executionTypeRequest) {
ResourceRequest request = Records.newRecord(ResourceRequest.class);
request.setPriority(priority);
request.setResourceName(hostName);
@ -95,7 +95,7 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
request.setNumContainers(numContainers);
request.setRelaxLocality(relaxLocality);
request.setNodeLabelExpression(labelExpression);
request.setExecutionType(execType);
request.setExecutionTypeRequest(executionTypeRequest);
return request;
}
@ -233,14 +233,16 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
public abstract boolean getRelaxLocality();
/**
* Set the <code>ExecutionType</code> of the requested container.
* Set the <code>ExecutionTypeRequest</code> of the requested container.
*
* @param execType
* ExecutionType of the requested container
* @param execSpec
* ExecutionTypeRequest of the requested container
*/
@Public
@Stable
public abstract void setExecutionType(ExecutionType execType);
@Evolving
public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
throw new UnsupportedOperationException();
}
/**
* Get whether locality relaxation is enabled with this
@ -250,8 +252,10 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
* <code>ResourceRequest</code>.
*/
@Public
@Stable
public abstract ExecutionType getExecutionType();
@Evolving
public ExecutionTypeRequest getExecutionTypeRequest() {
throw new UnsupportedOperationException();
}
/**
* <p>For a request at a network hierarchy level, set whether locality can be relaxed
@ -353,12 +357,12 @@ public abstract class ResourceRequest implements Comparable<ResourceRequest> {
return false;
} else if (!priority.equals(other.getPriority()))
return false;
ExecutionType executionType = getExecutionType();
if (executionType == null) {
if (other.getExecutionType() != null) {
ExecutionTypeRequest execTypeRequest = getExecutionTypeRequest();
if (execTypeRequest == null) {
if (other.getExecutionTypeRequest() != null) {
return false;
}
} else if (executionType != other.getExecutionType()) {
} else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) {
return false;
}
if (getNodeLabelExpression() == null) {

View File

@ -305,7 +305,12 @@ message ResourceRequestProto {
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
optional ExecutionTypeProto executionType = 7 [default = GUARANTEED];
optional ExecutionTypeRequestProto execution_type_request = 7;
}
message ExecutionTypeRequestProto {
optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED];
optional bool enforce_execution_type = 2 [default = false];
}
enum AMCommandProto {

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeState;
@ -129,7 +130,9 @@ public class TestDistributedScheduling extends TestAMRMProxy {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}
@ -235,7 +238,9 @@ public class TestDistributedScheduling extends TestAMRMProxy {
ResourceRequest newRR = ResourceRequest.newInstance(rr
.getPriority(), rr.getResourceName(),
rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(),
rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC);
rr.getNodeLabelExpression(),
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true));
newAskList.add(newRR);
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProtoOrBuilder;
/**
* Implementation of <code>ExecutionTypeRequest</code>.
*/
public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest {
private ExecutionTypeRequestProto proto =
ExecutionTypeRequestProto.getDefaultInstance();
private ExecutionTypeRequestProto.Builder builder = null;
private boolean viaProto = false;
public ExecutionTypeRequestPBImpl() {
builder = ExecutionTypeRequestProto.newBuilder();
}
public ExecutionTypeRequestPBImpl(ExecutionTypeRequestProto proto) {
this.proto = proto;
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = ExecutionTypeRequestProto.newBuilder(proto);
}
viaProto = false;
}
public ExecutionTypeRequestProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public ExecutionType getExecutionType() {
ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasExecutionType()) {
return null;
}
return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
}
@Override
public void setExecutionType(ExecutionType execType) {
maybeInitBuilder();
if (execType == null) {
builder.clearExecutionType();
return;
}
builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
}
@Override
public void setEnforceExecutionType(boolean enforceExecutionType) {
maybeInitBuilder();
builder.setEnforceExecutionType(enforceExecutionType);
}
@Override
public boolean getEnforceExecutionType() {
ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
return p.getEnforceExecutionType();
}
@Override
public String toString() {
return "{Execution Type: " + getExecutionType()
+ ", Enforce Execution Type: " + getEnforceExecutionType() + "}";
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;
@ -324,4 +326,17 @@ public class ProtoUtils {
ContainerRetryPolicyProto e) {
return ContainerRetryPolicy.valueOf(e.name());
}
/*
* ExecutionTypeRequest
*/
public static ExecutionTypeRequestProto convertToProtoFormat(
ExecutionTypeRequest e) {
return ((ExecutionTypeRequestPBImpl)e).getProto();
}
public static ExecutionTypeRequest convertFromProtoFormat(
ExecutionTypeRequestProto e) {
return new ExecutionTypeRequestPBImpl(e);
}
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -39,6 +39,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
private Priority priority = null;
private Resource capability = null;
private ExecutionTypeRequest executionTypeRequest = null;
public ResourceRequestPBImpl() {
@ -64,6 +65,10 @@ public class ResourceRequestPBImpl extends ResourceRequest {
if (this.capability != null) {
builder.setCapability(convertToProtoFormat(this.capability));
}
if (this.executionTypeRequest != null) {
builder.setExecutionTypeRequest(
ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
}
}
private void mergeLocalToProto() {
@ -102,6 +107,29 @@ public class ResourceRequestPBImpl extends ResourceRequest {
builder.clearPriority();
this.priority = priority;
}
public ExecutionTypeRequest getExecutionTypeRequest() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.executionTypeRequest != null) {
return this.executionTypeRequest;
}
if (!p.hasExecutionTypeRequest()) {
return null;
}
this.executionTypeRequest =
ProtoUtils.convertFromProtoFormat(p.getExecutionTypeRequest());
return this.executionTypeRequest;
}
public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
maybeInitBuilder();
if (execSpec == null) {
builder.clearExecutionTypeRequest();
}
this.executionTypeRequest = execSpec;
}
@Override
public String getResourceName() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
@ -186,7 +214,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
+ ", Node Label Expression: " + getNodeLabelExpression() + "}";
+ ", Execution Spec: " + getExecutionTypeRequest() + "}";
}
@Override
@ -207,24 +235,4 @@ public class ResourceRequestPBImpl extends ResourceRequest {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
@Override
public ExecutionType getExecutionType() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasExecutionType()) {
return null;
}
return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
}
@Override
public void setExecutionType(ExecutionType execType) {
maybeInitBuilder();
if (execType == null) {
builder.clearExecutionType();
return;
}
builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
}
}

View File

@ -123,6 +123,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
@ -463,6 +464,7 @@ public class TestPBImplRecords {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
generateByNewInstance(ExecutionTypeRequest.class);
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);

View File

@ -214,7 +214,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {
if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
if (rr.getExecutionTypeRequest().getExecutionType() ==
ExecutionType.OPPORTUNISTIC) {
partitionedRequests.getOpportunistic().add(rr);
} else {
partitionedRequests.getGuaranteed().add(rr);

View File

@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
@ -138,13 +138,15 @@ public class TestLocalScheduler {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
guaranteedReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true));
guaranteedReq.setNumContainers(5);
guaranteedReq.setCapability(Resource.newInstance(2048, 2));
guaranteedReq.setRelaxLocality(true);
guaranteedReq.setResourceName("*");
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
opportunisticReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(4);
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
opportunisticReq.setPriority(Priority.newInstance(100));
@ -167,7 +169,8 @@ public class TestLocalScheduler {
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = Records.newRecord(ResourceRequest.class);
opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
opportunisticReq.setExecutionTypeRequest(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(6);
opportunisticReq.setCapability(Resource.newInstance(512, 3));
opportunisticReq.setPriority(Priority.newInstance(100));

View File

@ -39,8 +39,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -104,6 +107,13 @@ public class TestDistributedSchedulingService {
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
AllocateRequest allReq =
(AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
allReq.setAskList(Arrays.asList(
ResourceRequest.newInstance(Priority.UNDEFINED, "a",
Resource.newInstance(1, 2), 1, true, "exp",
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@ -168,8 +178,7 @@ public class TestDistributedSchedulingService {
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
((AllocateRequestPBImpl)factory
.newRecordInstance(AllocateRequest.class)).getProto()));
((AllocateRequestPBImpl)allReq).getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@ -235,6 +244,10 @@ public class TestDistributedSchedulingService {
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
List<ResourceRequest> askList = request.getAskList();
Assert.assertEquals(1, askList.size());
Assert.assertTrue(askList.get(0)
.getExecutionTypeRequest().getEnforceExecutionType());
DistSchedAllocateResponse resp =
factory.newRecordInstance(DistSchedAllocateResponse.class);
resp.setNodesForScheduling(