ExecutionType
as well as flag that explicitly asks the
+ * configuredScheduler to return Containers of exactly the Execution Type
+ * requested.
+ */
+@Public
+@Evolving
+public abstract class ExecutionTypeRequest {
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance() {
+ return newInstance(ExecutionType.GUARANTEED, false);
+ }
+
+ @Public
+ @Evolving
+ public static ExecutionTypeRequest newInstance(ExecutionType execType,
+ boolean ensureExecutionType) {
+ ExecutionTypeRequest executionTypeRequest =
+ Records.newRecord(ExecutionTypeRequest.class);
+ executionTypeRequest.setExecutionType(execType);
+ executionTypeRequest.setEnforceExecutionType(ensureExecutionType);
+ return executionTypeRequest;
+ }
+
+ /**
+ * Set the ExecutionType
of the requested container.
+ *
+ * @param execType
+ * ExecutionType of the requested container
+ */
+ @Public
+ public abstract void setExecutionType(ExecutionType execType);
+
+ /**
+ * Get ExecutionType
.
+ *
+ * @return ExecutionType
.
+ */
+ @Public
+ public abstract ExecutionType getExecutionType();
+
+ /**
+ * Set to true to explicitly ask that the Scheduling Authority return
+ * Containers of exactly the Execution Type requested.
+ * @param enforceExecutionType whether ExecutionType request should be
+ * strictly honored.
+ */
+ @Public
+ public abstract void setEnforceExecutionType(boolean enforceExecutionType);
+
+
+ /**
+ * Get whether Scheduling Authority should return Containers of exactly the
+ * Execution Type requested for this ResourceRequest
.
+ * Defaults to false.
+ * @return whether ExecutionType request should be strictly honored
+ */
+ @Public
+ public abstract boolean getEnforceExecutionType();
+
+ @Override
+ public int hashCode() {
+ final int prime = 2153;
+ int result = 2459;
+ ExecutionType executionType = getExecutionType();
+ boolean ensureExecutionType = getEnforceExecutionType();
+ result = prime * result + ((executionType == null) ? 0 :
+ executionType.hashCode());
+ result = prime * result + (ensureExecutionType ? 0 : 1);
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ ExecutionTypeRequest other = (ExecutionTypeRequest) obj;
+ ExecutionType executionType = getExecutionType();
+ if (executionType == null) {
+ if (other.getExecutionType() != null) {
+ return false;
+ }
+ } else if (executionType != other.getExecutionType()) {
+ return false;
+ }
+ boolean enforceExecutionType = getEnforceExecutionType();
+ return enforceExecutionType == other.getEnforceExecutionType();
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 8c1fd8d00db..fbe7e580dfa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -80,14 +80,14 @@ public abstract class ResourceRequest implements ComparableExecutionType
of the requested container.
+ * Set the ExecutionTypeRequest
of the requested container.
*
- * @param execType
- * ExecutionType of the requested container
+ * @param execSpec
+ * ExecutionTypeRequest of the requested container
*/
@Public
- @Stable
- public abstract void setExecutionType(ExecutionType execType);
+ @Evolving
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ throw new UnsupportedOperationException();
+ }
/**
* Get whether locality relaxation is enabled with this
@@ -250,8 +252,10 @@ public abstract class ResourceRequest implements ComparableResourceRequest
.
*/
@Public
- @Stable
- public abstract ExecutionType getExecutionType();
+ @Evolving
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ throw new UnsupportedOperationException();
+ }
/**
* For a request at a network hierarchy level, set whether locality can be relaxed
@@ -353,12 +357,12 @@ public abstract class ResourceRequest implements ComparableExecutionTypeRequest
.
+ */
+public class ExecutionTypeRequestPBImpl extends ExecutionTypeRequest {
+ private ExecutionTypeRequestProto proto =
+ ExecutionTypeRequestProto.getDefaultInstance();
+ private ExecutionTypeRequestProto.Builder builder = null;
+ private boolean viaProto = false;
+
+ public ExecutionTypeRequestPBImpl() {
+ builder = ExecutionTypeRequestProto.newBuilder();
+ }
+
+ public ExecutionTypeRequestPBImpl(ExecutionTypeRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = ExecutionTypeRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ public ExecutionTypeRequestProto getProto() {
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ @Override
+ public ExecutionType getExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasExecutionType()) {
+ return null;
+ }
+ return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
+ }
+
+ @Override
+ public void setExecutionType(ExecutionType execType) {
+ maybeInitBuilder();
+ if (execType == null) {
+ builder.clearExecutionType();
+ return;
+ }
+ builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
+ }
+
+ @Override
+ public void setEnforceExecutionType(boolean enforceExecutionType) {
+ maybeInitBuilder();
+ builder.setEnforceExecutionType(enforceExecutionType);
+ }
+
+ @Override
+ public boolean getEnforceExecutionType() {
+ ExecutionTypeRequestProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getEnforceExecutionType();
+ }
+
+ @Override
+ public String toString() {
+ return "{Execution Type: " + getExecutionType()
+ + ", Enforce Execution Type: " + getEnforceExecutionType() + "}";
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 46a1802a9e4..459a10e13d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerRetryPolicyProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;
@@ -324,4 +326,17 @@ public class ProtoUtils {
YarnProtos.ResourceProto resource) {
return new ResourcePBImpl(resource);
}
+
+ /*
+ * ExecutionTypeRequest
+ */
+ public static ExecutionTypeRequestProto convertToProtoFormat(
+ ExecutionTypeRequest e) {
+ return ((ExecutionTypeRequestPBImpl)e).getProto();
+ }
+
+ public static ExecutionTypeRequest convertFromProtoFormat(
+ ExecutionTypeRequestProto e) {
+ return new ExecutionTypeRequestPBImpl(e);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
index 53ae2cde78f..fd56f4f8f06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourceRequestPBImpl.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.api.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -39,6 +39,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
private Priority priority = null;
private Resource capability = null;
+ private ExecutionTypeRequest executionTypeRequest = null;
public ResourceRequestPBImpl() {
@@ -64,6 +65,10 @@ public class ResourceRequestPBImpl extends ResourceRequest {
if (this.capability != null) {
builder.setCapability(convertToProtoFormat(this.capability));
}
+ if (this.executionTypeRequest != null) {
+ builder.setExecutionTypeRequest(
+ ProtoUtils.convertToProtoFormat(this.executionTypeRequest));
+ }
}
private void mergeLocalToProto() {
@@ -102,6 +107,29 @@ public class ResourceRequestPBImpl extends ResourceRequest {
builder.clearPriority();
this.priority = priority;
}
+
+
+ public ExecutionTypeRequest getExecutionTypeRequest() {
+ ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.executionTypeRequest != null) {
+ return this.executionTypeRequest;
+ }
+ if (!p.hasExecutionTypeRequest()) {
+ return null;
+ }
+ this.executionTypeRequest =
+ ProtoUtils.convertFromProtoFormat(p.getExecutionTypeRequest());
+ return this.executionTypeRequest;
+ }
+
+ public void setExecutionTypeRequest(ExecutionTypeRequest execSpec) {
+ maybeInitBuilder();
+ if (execSpec == null) {
+ builder.clearExecutionTypeRequest();
+ }
+ this.executionTypeRequest = execSpec;
+ }
+
@Override
public String getResourceName() {
ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
@@ -186,7 +214,7 @@ public class ResourceRequestPBImpl extends ResourceRequest {
+ ", # Containers: " + getNumContainers()
+ ", Location: " + getResourceName()
+ ", Relax Locality: " + getRelaxLocality()
- + ", Node Label Expression: " + getNodeLabelExpression() + "}";
+ + ", Execution Spec: " + getExecutionTypeRequest() + "}";
}
@Override
@@ -207,24 +235,4 @@ public class ResourceRequestPBImpl extends ResourceRequest {
}
builder.setNodeLabelExpression(nodeLabelExpression);
}
-
- @Override
- public ExecutionType getExecutionType() {
- ResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
- if (!p.hasExecutionType()) {
- return null;
- }
- return ProtoUtils.convertFromProtoFormat(p.getExecutionType());
- }
-
- @Override
- public void setExecutionType(ExecutionType execType) {
- maybeInitBuilder();
- if (execType == null) {
- builder.clearExecutionType();
- return;
- }
- builder.setExecutionType(ProtoUtils.convertToProtoFormat(execType));
- }
-
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 14f61b736eb..91d65b14a89 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -123,6 +123,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NMToken;
@@ -463,6 +464,7 @@ public class TestPBImplRecords {
"http", "localhost", 8080, "file0"));
typeValueCache.put(SerializedException.class,
SerializedException.newInstance(new IOException("exception for test")));
+ generateByNewInstance(ExecutionTypeRequest.class);
generateByNewInstance(LogAggregationContext.class);
generateByNewInstance(ApplicationId.class);
generateByNewInstance(ApplicationAttemptId.class);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
index fca814b7d1d..8e2ceb014ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/LocalScheduler.java
@@ -214,7 +214,8 @@ public final class LocalScheduler extends AbstractRequestInterceptor {
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {
- if (rr.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+ if (rr.getExecutionTypeRequest().getExecutionType() ==
+ ExecutionType.OPPORTUNISTIC) {
partitionedRequests.getOpportunistic().add(rr);
} else {
partitionedRequests.getGuaranteed().add(rr);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
index e987e79e303..a1d39f76051 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/scheduler/TestLocalScheduler.java
@@ -19,8 +19,8 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistSchedAllocateResponse;
@@ -138,13 +138,15 @@ public class TestLocalScheduler {
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
ResourceRequest guaranteedReq = Records.newRecord(ResourceRequest.class);
- guaranteedReq.setExecutionType(ExecutionType.GUARANTEED);
+ guaranteedReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED, true));
guaranteedReq.setNumContainers(5);
guaranteedReq.setCapability(Resource.newInstance(2048, 2));
guaranteedReq.setRelaxLocality(true);
guaranteedReq.setResourceName("*");
ResourceRequest opportunisticReq = Records.newRecord(ResourceRequest.class);
- opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(4);
opportunisticReq.setCapability(Resource.newInstance(1024, 4));
opportunisticReq.setPriority(Priority.newInstance(100));
@@ -167,7 +169,8 @@ public class TestLocalScheduler {
// New Allocate request
allocateRequest = Records.newRecord(AllocateRequest.class);
opportunisticReq = Records.newRecord(ResourceRequest.class);
- opportunisticReq.setExecutionType(ExecutionType.OPPORTUNISTIC);
+ opportunisticReq.setExecutionTypeRequest(
+ ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true));
opportunisticReq.setNumContainers(6);
opportunisticReq.setCapability(Resource.newInstance(512, 3));
opportunisticReq.setPriority(Priority.newInstance(100));
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
index 5d5ab789f86..7d2ed33be34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestDistributedSchedulingService.java
@@ -39,8 +39,11 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -104,6 +107,13 @@ public class TestDistributedSchedulingService {
ContainerId.newContainerId(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(12345, 1), 2), 3));
+ AllocateRequest allReq =
+ (AllocateRequestPBImpl)factory.newRecordInstance(AllocateRequest.class);
+ allReq.setAskList(Arrays.asList(
+ ResourceRequest.newInstance(Priority.UNDEFINED, "a",
+ Resource.newInstance(1, 2), 1, true, "exp",
+ ExecutionTypeRequest.newInstance(
+ ExecutionType.OPPORTUNISTIC, true))));
DistributedSchedulingService service = createService(factory, rmContext, c);
Server server = service.getServer(rpc, conf, addr, null);
server.start();
@@ -168,8 +178,7 @@ public class TestDistributedSchedulingService {
DistSchedAllocateResponse dsAllocResp =
new DistSchedAllocateResponsePBImpl(
dsProxy.allocateForDistributedScheduling(null,
- ((AllocateRequestPBImpl)factory
- .newRecordInstance(AllocateRequest.class)).getProto()));
+ ((AllocateRequestPBImpl)allReq).getProto()));
Assert.assertEquals(
"h1", dsAllocResp.getNodesForScheduling().get(0).getHost());
@@ -235,6 +244,10 @@ public class TestDistributedSchedulingService {
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling(
AllocateRequest request) throws YarnException, IOException {
+ List