From 384a84828f24d9c26177b6b5f6abf192687db657 Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Thu, 2 Jun 2016 05:18:01 -0700 Subject: [PATCH] YARN-5180. Allow ResourceRequest to specify an enforceExecutionType flag. (asuresh) (cherry picked from commit dc26601d8fe27a4223a50601bf7522cc42e8e2f3) --- .../v2/app/rm/RMContainerRequestor.java | 4 +- .../api/records/ExecutionTypeRequest.java | 124 ++++++++++++++++++ .../yarn/api/records/ResourceRequest.java | 34 ++--- .../src/main/proto/yarn_protos.proto | 7 +- .../api/impl/TestDistributedScheduling.java | 9 +- .../impl/pb/ExecutionTypeRequestPBImpl.java | 93 +++++++++++++ .../yarn/api/records/impl/pb/ProtoUtils.java | 15 +++ .../impl/pb/ResourceRequestPBImpl.java | 52 ++++---- .../hadoop/yarn/api/TestPBImplRecords.java | 2 + .../nodemanager/scheduler/LocalScheduler.java | 3 +- .../scheduler/TestLocalScheduler.java | 11 +- .../TestDistributedSchedulingService.java | 17 ++- 12 files changed, 323 insertions(+), 48 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index 7030712d58a..f4579abd476 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; @@ -462,7 +463,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { remoteRequest.setCapability(capability); remoteRequest.setNumContainers(0); remoteRequest.setNodeLabelExpression(nodeLabelExpression); - remoteRequest.setExecutionType(executionType); + remoteRequest.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(executionType, true)); reqMap.put(capability, remoteRequest); } remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java new file mode 100644 index 00000000000..f553a44b237 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.yarn.util.Records; + +/** + * An object of this class represents a specification of the execution + * guarantee of the Containers associated with a ResourceRequest. It consists + * of an ExecutionType as well as flag that explicitly asks the + * configuredScheduler to return Containers of exactly the Execution Type + * requested. + */ +@Public +@Evolving +public abstract class ExecutionTypeRequest { + + @Public + @Evolving + public static ExecutionTypeRequest newInstance() { + return newInstance(ExecutionType.GUARANTEED, false); + } + + @Public + @Evolving + public static ExecutionTypeRequest newInstance(ExecutionType execType, + boolean ensureExecutionType) { + ExecutionTypeRequest executionTypeRequest = + Records.newRecord(ExecutionTypeRequest.class); + executionTypeRequest.setExecutionType(execType); + executionTypeRequest.setEnforceExecutionType(ensureExecutionType); + return executionTypeRequest; + } + + /** + * Set the ExecutionType of the requested container. + * + * @param execType + * ExecutionType of the requested container + */ + @Public + public abstract void setExecutionType(ExecutionType execType); + + /** + * Get ExecutionType. + * + * @return ExecutionType. + */ + @Public + public abstract ExecutionType getExecutionType(); + + /** + * Set to true to explicitly ask that the Scheduling Authority return + * Containers of exactly the Execution Type requested. + * @param enforceExecutionType whether ExecutionType request should be + * strictly honored. + */ + @Public + public abstract void setEnforceExecutionType(boolean enforceExecutionType); + + + /** + * Get whether Scheduling Authority should return Containers of exactly the + * Execution Type requested for this ResourceRequest. + * Defaults to false. + * @return whether ExecutionType request should be strictly honored + */ + @Public + public abstract boolean getEnforceExecutionType(); + + @Override + public int hashCode() { + final int prime = 2153; + int result = 2459; + ExecutionType executionType = getExecutionType(); + boolean ensureExecutionType = getEnforceExecutionType(); + result = prime * result + ((executionType == null) ? 0 : + executionType.hashCode()); + result = prime * result + (ensureExecutionType ? 0 : 1); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + ExecutionTypeRequest other = (ExecutionTypeRequest) obj; + ExecutionType executionType = getExecutionType(); + if (executionType == null) { + if (other.getExecutionType() != null) { + return false; + } + } else if (executionType != other.getExecutionType()) { + return false; + } + boolean enforceExecutionType = getEnforceExecutionType(); + return enforceExecutionType == other.getEnforceExecutionType(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java index 8c1fd8d00db..fbe7e580dfa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java @@ -80,14 +80,14 @@ public abstract class ResourceRequest implements Comparable { Resource capability, int numContainers, boolean relaxLocality, String labelExpression) { return newInstance(priority, hostName, capability, numContainers, - relaxLocality, labelExpression, ExecutionType.GUARANTEED); + relaxLocality, labelExpression, ExecutionTypeRequest.newInstance()); } @Public - @Stable + @Evolving public static ResourceRequest newInstance(Priority priority, String hostName, Resource capability, int numContainers, boolean relaxLocality, String - labelExpression, ExecutionType execType) { + labelExpression, ExecutionTypeRequest executionTypeRequest) { ResourceRequest request = Records.newRecord(ResourceRequest.class); request.setPriority(priority); request.setResourceName(hostName); @@ -95,7 +95,7 @@ public abstract class ResourceRequest implements Comparable { request.setNumContainers(numContainers); request.setRelaxLocality(relaxLocality); request.setNodeLabelExpression(labelExpression); - request.setExecutionType(execType); + request.setExecutionTypeRequest(executionTypeRequest); return request; } @@ -233,14 +233,16 @@ public abstract class ResourceRequest implements Comparable { public abstract boolean getRelaxLocality(); /** - * Set the ExecutionType of the requested container. + * Set the ExecutionTypeRequest of the requested container. * - * @param execType - * ExecutionType of the requested container + * @param execSpec + * ExecutionTypeRequest of the requested container */ @Public - @Stable - public abstract void setExecutionType(ExecutionType execType); + @Evolving + public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) { + throw new UnsupportedOperationException(); + } /** * Get whether locality relaxation is enabled with this @@ -250,8 +252,10 @@ public abstract class ResourceRequest implements Comparable { * ResourceRequest. */ @Public - @Stable - public abstract ExecutionType getExecutionType(); + @Evolving + public ExecutionTypeRequest getExecutionTypeRequest() { + throw new UnsupportedOperationException(); + } /** *

For a request at a network hierarchy level, set whether locality can be relaxed @@ -353,12 +357,12 @@ public abstract class ResourceRequest implements Comparable { return false; } else if (!priority.equals(other.getPriority())) return false; - ExecutionType executionType = getExecutionType(); - if (executionType == null) { - if (other.getExecutionType() != null) { + ExecutionTypeRequest execTypeRequest = getExecutionTypeRequest(); + if (execTypeRequest == null) { + if (other.getExecutionTypeRequest() != null) { return false; } - } else if (executionType != other.getExecutionType()) { + } else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) { return false; } if (getNodeLabelExpression() == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 81fae5820b5..dc029e86bed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -306,7 +306,12 @@ message ResourceRequestProto { optional int32 num_containers = 4; optional bool relax_locality = 5 [default = true]; optional string node_label_expression = 6; - optional ExecutionTypeProto executionType = 7 [default = GUARANTEED]; + optional ExecutionTypeRequestProto execution_type_request = 7; +} + +message ExecutionTypeRequestProto { + optional ExecutionTypeProto execution_type = 1 [default = GUARANTEED]; + optional bool enforce_execution_type = 2 [default = false]; } enum AMCommandProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java index b4dcf66f3e3..6d93eb39591 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestDistributedScheduling.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeState; @@ -129,7 +130,9 @@ public class TestDistributedScheduling extends TestAMRMProxy { ResourceRequest newRR = ResourceRequest.newInstance(rr .getPriority(), rr.getResourceName(), rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + rr.getNodeLabelExpression(), + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)); newAskList.add(newRR); } } @@ -235,7 +238,9 @@ public class TestDistributedScheduling extends TestAMRMProxy { ResourceRequest newRR = ResourceRequest.newInstance(rr .getPriority(), rr.getResourceName(), rr.getCapability(), rr.getNumContainers(), rr.getRelaxLocality(), - rr.getNodeLabelExpression(), ExecutionType.OPPORTUNISTIC); + rr.getNodeLabelExpression(), + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)); newAskList.add(newRR); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java new file mode 100644 index 00000000000..0037dd3f6b7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records.impl.pb; + +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; +import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProtoOrBuilder; + +/** + * Implementation of ExecutionTypeRequest. + */ +public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest { + private ExecutionTypeRequestProto proto = + ExecutionTypeRequestProto.getDefaultInstance(); + private ExecutionTypeRequestProto.Builder builder = null; + private boolean viaProto = false; + + public ExecutionTypeRequestPBImpl() { + builder = ExecutionTypeRequestProto.newBuilder(); + } + + public ExecutionTypeRequestPBImpl(ExecutionTypeRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = ExecutionTypeRequestProto.newBuilder(proto); + } + viaProto = false; + } + + public ExecutionTypeRequestProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public ExecutionType getExecutionType() { + ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasExecutionType()) { + return null; + } + return ProtoUtils.convertFromProtoFormat(p.getExecutionType()); + } + + @Override + public void setExecutionType(ExecutionType execType) { + maybeInitBuilder(); + if (execType == null) { + builder.clearExecutionType(); + return; + } + builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); + } + + @Override + public void setEnforceExecutionType(boolean enforceExecutionType) { + maybeInitBuilder(); + builder.setEnforceExecutionType(enforceExecutionType); + } + + @Override + public boolean getEnforceExecutionType() { + ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder; + return p.getEnforceExecutionType(); + } + + @Override + public String toString() { + return "{Execution Type: " + getExecutionType() + + ", Enforce Execution Type: " + getEnforceExecutionType() + "}"; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java index 46a1802a9e4..459a10e13d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto; import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos; import org.apache.hadoop.yarn.server.api.ContainerType; @@ -324,4 +326,17 @@ public class ProtoUtils { YarnProtos.ResourceProto resource) { return new ResourcePBImpl(resource); } + + /* + * ExecutionTypeRequest + */ + public static ExecutionTypeRequestProto convertToProtoFormat( + ExecutionTypeRequest e) { + return ((ExecutionTypeRequestPBImpl)e).getProto(); + } + + public static ExecutionTypeRequest convertFromProtoFormat( + ExecutionTypeRequestProto e) { + return new ExecutionTypeRequestPBImpl(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java index 53ae2cde78f..fd56f4f8f06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java @@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -39,6 +39,7 @@ public class ResourceRequestPBImpl extends ResourceRequest { private Priority priority = null; private Resource capability = null; + private ExecutionTypeRequest executionTypeRequest = null; public ResourceRequestPBImpl() { @@ -64,6 +65,10 @@ public class ResourceRequestPBImpl extends ResourceRequest { if (this.capability != null) { builder.setCapability(convertToProtoFormat(this.capability)); } + if (this.executionTypeRequest != null) { + builder.setExecutionTypeRequest( + ProtoUtils.convertToProtoFormat(this.executionTypeRequest)); + } } private void mergeLocalToProto() { @@ -102,6 +107,29 @@ public class ResourceRequestPBImpl extends ResourceRequest { builder.clearPriority(); this.priority = priority; } + + + public ExecutionTypeRequest getExecutionTypeRequest() { + ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; + if (this.executionTypeRequest != null) { + return this.executionTypeRequest; + } + if (!p.hasExecutionTypeRequest()) { + return null; + } + this.executionTypeRequest = + ProtoUtils.convertFromProtoFormat(p.getExecutionTypeRequest()); + return this.executionTypeRequest; + } + + public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) { + maybeInitBuilder(); + if (execSpec == null) { + builder.clearExecutionTypeRequest(); + } + this.executionTypeRequest = execSpec; + } + @Override public String getResourceName() { ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -186,7 +214,7 @@ public class ResourceRequestPBImpl extends ResourceRequest { + ", # Containers: " + getNumContainers() + ", Location: " + getResourceName() + ", Relax Locality: " + getRelaxLocality() - + ", Node Label Expression: " + getNodeLabelExpression() + "}"; + + ", Execution Spec: " + getExecutionTypeRequest() + "}"; } @Override @@ -207,24 +235,4 @@ public class ResourceRequestPBImpl extends ResourceRequest { } builder.setNodeLabelExpression(nodeLabelExpression); } - - @Override - public ExecutionType getExecutionType() { - ResourceRequestProtoOrBuilder p = viaProto ? proto : builder; - if (!p.hasExecutionType()) { - return null; - } - return ProtoUtils.convertFromProtoFormat(p.getExecutionType()); - } - - @Override - public void setExecutionType(ExecutionType execType) { - maybeInitBuilder(); - if (execType == null) { - builder.clearExecutionType(); - return; - } - builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType)); - } - } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java index 14f61b736eb..91d65b14a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java @@ -123,6 +123,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport; import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest; import org.apache.hadoop.yarn.api.records.ContainerRetryContext; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LogAggregationContext; import org.apache.hadoop.yarn.api.records.NMToken; @@ -463,6 +464,7 @@ public class TestPBImplRecords { "http", "localhost", 8080, "file0")); typeValueCache.put(SerializedException.class, SerializedException.newInstance(new IOException("exception for test"))); + generateByNewInstance(ExecutionTypeRequest.class); generateByNewInstance(LogAggregationContext.class); generateByNewInstance(ApplicationId.class); generateByNewInstance(ApplicationAttemptId.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java index fca814b7d1d..8e2ceb014ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java @@ -214,7 +214,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor { PartitionedResourceRequests partitionedRequests = new PartitionedResourceRequests(); for (ResourceRequest rr : askList) { - if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) { + if (rr.getExecutionTypeRequest().getExecutionType() == + ExecutionType.OPPORTUNISTIC) { partitionedRequests.getOpportunistic().add(rr); } else { partitionedRequests.getGuaranteed().add(rr); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java index e987e79e303..a1d39f76051 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java @@ -19,8 +19,8 @@ package org.apache.hadoop.yarn.server.nodemanager.scheduler; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; -import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse; @@ -138,13 +138,15 @@ public class TestLocalScheduler { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class); - guaranteedReq.setExecutionType(ExecutionType.GUARANTEED); + guaranteedReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true)); guaranteedReq.setNumContainers(5); guaranteedReq.setCapability(Resource.newInstance(2048, 2)); guaranteedReq.setRelaxLocality(true); guaranteedReq.setResourceName("*"); ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); opportunisticReq.setNumContainers(4); opportunisticReq.setCapability(Resource.newInstance(1024, 4)); opportunisticReq.setPriority(Priority.newInstance(100)); @@ -167,7 +169,8 @@ public class TestLocalScheduler { // New Allocate request allocateRequest = Records.newRecord(AllocateRequest.class); opportunisticReq = Records.newRecord(ResourceRequest.class); - opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC); + opportunisticReq.setExecutionTypeRequest( + ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true)); opportunisticReq.setNumContainers(6); opportunisticReq.setCapability(Resource.newInstance(512, 3)); opportunisticReq.setPriority(Priority.newInstance(100)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java index 5d5ab789f86..7d2ed33be34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java @@ -39,8 +39,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; @@ -104,6 +107,13 @@ public class TestDistributedSchedulingService { ContainerId.newContainerId( ApplicationAttemptId.newInstance( ApplicationId.newInstance(12345, 1), 2), 3)); + AllocateRequest allReq = + (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class); + allReq.setAskList(Arrays.asList( + ResourceRequest.newInstance(Priority.UNDEFINED, "a", + Resource.newInstance(1, 2), 1, true, "exp", + ExecutionTypeRequest.newInstance( + ExecutionType.OPPORTUNISTIC, true)))); DistributedSchedulingService service = createService(factory, rmContext, c); Server server = service.getServer(rpc, conf, addr, null); server.start(); @@ -168,8 +178,7 @@ public class TestDistributedSchedulingService { DistSchedAllocateResponse dsAllocResp = new DistSchedAllocateResponsePBImpl( dsProxy.allocateForDistributedScheduling(null, - ((AllocateRequestPBImpl)factory - .newRecordInstance(AllocateRequest.class)).getProto())); + ((AllocateRequestPBImpl)allReq).getProto())); Assert.assertEquals( "h1", dsAllocResp.getNodesForScheduling().get(0).getHost()); @@ -235,6 +244,10 @@ public class TestDistributedSchedulingService { @Override public DistSchedAllocateResponse allocateForDistributedScheduling( AllocateRequest request) throws YarnException, IOException { + List askList = request.getAskList(); + Assert.assertEquals(1, askList.size()); + Assert.assertTrue(askList.get(0) + .getExecutionTypeRequest().getEnforceExecutionType()); DistSchedAllocateResponse resp = factory.newRecordInstance(DistSchedAllocateResponse.class); resp.setNodesForScheduling(