From e8585afa032d2aee0593b238c46799cbb884732d Mon Sep 17 00:00:00 2001
From: Christopher Douglas
Date: Tue, 7 May 2013 06:18:25 +0000
Subject: [PATCH] YARN-45. Add protocol for schedulers to request containers
back from ApplicationMasters. Contributed by Carlo Curino and Chris Douglas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1479773 13f79535-47bb-0310-9956-ffa450edef68
---
hadoop-yarn-project/CHANGES.txt | 3 +
.../api/protocolrecords/AllocateResponse.java | 25 ++
.../protocolrecords/PreemptionContainer.java | 44 ++++
.../protocolrecords/PreemptionContract.java | 73 ++++++
.../protocolrecords/PreemptionMessage.java | 84 +++++++
.../PreemptionResourceRequest.java | 45 ++++
.../StrictPreemptionContract.java | 54 +++++
.../impl/pb/AllocateResponsePBImpl.java | 36 ++-
.../impl/pb/PreemptionContainerPBImpl.java | 103 ++++++++
.../impl/pb/PreemptionContractPBImpl.java | 228 ++++++++++++++++++
.../impl/pb/PreemptionMessagePBImpl.java | 141 +++++++++++
.../pb/PreemptionResourceRequestPBImpl.java | 103 ++++++++
.../pb/StrictPreemptionContractPBImpl.java | 148 ++++++++++++
.../src/main/proto/yarn_service_protos.proto | 21 ++
.../yarn/client/TestAMRMClientAsync.java | 2 +-
.../apache/hadoop/yarn/util/BuilderUtils.java | 5 +-
16 files changed, 1112 insertions(+), 3 deletions(-)
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContainer.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContract.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionMessage.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionResourceRequest.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StrictPreemptionContract.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContainerPBImpl.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContractPBImpl.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionMessagePBImpl.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionResourceRequestPBImpl.java
create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StrictPreemptionContractPBImpl.java
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a83cc29faaa..5bd6f8e65b8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -45,6 +45,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-482. FS: Extend SchedulingMode to intermediate queues.
(kkambatl via tucu)
+ YARN-45. Add protocol for schedulers to request containers back from
+ ApplicationMasters. (Carlo Curino, cdouglas)
+
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index 0426ee359a6..8da0d95bb2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
@@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
*
* A list of nodes whose status has been updated.
* The number of available nodes in a cluster.
+ * A description of resources requested back by the cluster
*
*
*
@@ -152,4 +154,27 @@ public interface AllocateResponse {
@Private
@Unstable
public void setNumClusterNodes(int numNodes);
+
+ /**
+ * Get the description of containers owned by the AM, but requested back by
+ * the cluster. Note that the RM may have an inconsistent view of the
+ * resources owned by the AM. These messages are advisory, and the AM may
+ * elect to ignore them.
+ *
+ * The message is a snapshot of the resources the RM wants back from the AM.
+ * While demand persists, the RM will repeat its request; applications should
+ * not interpret each message as a request for additional
+ * resources on top of previous messages. Resources requested consistently
+ * over some duration may be forcibly killed by the RM.
+ *
+ * @return A specification of the resources to reclaim from this AM.
+ */
+ @Public
+ @Evolving
+ public PreemptionMessage getPreemptionMessage();
+
+ @Private
+ @Unstable
+ public void setPreemptionMessage(PreemptionMessage request);
+
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContainer.java
new file mode 100644
index 00000000000..d51d696854b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContainer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Specific container requested back by the ResourceManager
.
+ * @see PreemptionContract
+ * @see StrictPreemptionContract
+ */
+public interface PreemptionContainer {
+
+ /**
+ * @return Container referenced by this handle.
+ */
+ @Public
+ @Evolving
+ public ContainerId getId();
+
+ @Private
+ @Unstable
+ public void setId(ContainerId id);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContract.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContract.java
new file mode 100644
index 00000000000..8fc64e5085e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionContract.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Description of resources requested back by the ResourceManager
.
+ * The ApplicationMaster
(AM) can satisfy this request according
+ * to its own priorities to prevent containers from being forcibly killed by
+ * the platform.
+ * @see PreemptionMessage
+ */
+public interface PreemptionContract {
+
+ /**
+ * If the AM releases resources matching these requests, then the {@link
+ * PreemptionContainer}s enumerated in {@link #getContainers()} should not be
+ * evicted from the cluster. Due to delays in propagating cluster state and
+ * sending these messages, there are conditions where satisfied contracts may
+ * not prevent the platform from killing containers.
+ * @return List of {@link PreemptionResourceRequest} to update the
+ * ApplicationMaster
about resources requested back by the
+ * ResourceManager
.
+ * @see AllocateRequest#setAskList(List)
+ */
+ @Public
+ @Evolving
+ public List getResourceRequest();
+
+ @Private
+ @Unstable
+ public void setResourceRequest(List req);
+
+ /**
+ * Assign the set of {@link PreemptionContainer} specifying which containers
+ * owned by the ApplicationMaster
that may be reclaimed by the
+ * ResourceManager
. If the AM prefers a different set of
+ * containers, then it may checkpoint or kill containers matching the
+ * description in {@link #getResourceRequest}.
+ * @return Set of containers at risk if the contract is not met.
+ */
+ @Public
+ @Evolving
+ public Set getContainers();
+
+
+ @Private
+ @Unstable
+ public void setContainers(Set containers);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionMessage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionMessage.java
new file mode 100644
index 00000000000..a7961fead61
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionMessage.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * A {@link PreemptionMessage} is part of the RM-AM protocol, and it is used by
+ * the RM to specify resources that the RM wants to reclaim from this
+ * ApplicationMaster
(AM). The AM receives a {@link
+ * StrictPreemptionContract} message encoding which containers the platform may
+ * forcibly kill, granting it an opportunity to checkpoint state or adjust its
+ * execution plan. The message may also include a {@link PreemptionContract}
+ * granting the AM more latitude in selecting which resources to return to the
+ * cluster.
+ *
+ * The AM should decode both parts of the message. The {@link
+ * StrictPreemptionContract} specifies particular allocations that the RM
+ * requires back. The AM can checkpoint containers' state, adjust its execution
+ * plan to move the computation, or take no action and hope that conditions that
+ * caused the RM to ask for the container will change.
+ *
+ * In contrast, the {@link PreemptionContract} also includes a description of
+ * resources with a set of containers. If the AM releases containers matching
+ * that profile, then the containers enumerated in {@link
+ * PreemptionContract#getContainers()} may not be killed.
+ *
+ * Each preemption message reflects the RM's current understanding of the
+ * cluster state, so a request to return N containers may not
+ * reflect containers the AM is releasing, recently exited containers the RM has
+ * yet to learn about, or new containers allocated before the message was
+ * generated. Conversely, an RM may request a different profile of containers in
+ * subsequent requests.
+ *
+ * The policy enforced by the RM is part of the scheduler. Generally, only
+ * containers that have been requested consistently should be killed, but the
+ * details are not specified.
+ */
+@Public
+@Evolving
+public interface PreemptionMessage {
+
+ /**
+ * @return Specific resources that may be killed by the
+ * ResourceManager
+ */
+ @Public
+ @Evolving
+ public StrictPreemptionContract getStrictContract();
+
+ @Private
+ @Unstable
+ public void setStrictContract(StrictPreemptionContract set);
+
+ /**
+ * @return Contract describing resources to return to the cluster.
+ */
+ @Public
+ @Evolving
+ public PreemptionContract getContract();
+
+ @Private
+ @Unstable
+ public void setContract(PreemptionContract contract);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionResourceRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionResourceRequest.java
new file mode 100644
index 00000000000..1187fd8d25f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/PreemptionResourceRequest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+
+/**
+ * Description of resources requested back by the cluster.
+ * @see PreemptionContract
+ * @see AllocateRequest#setAskList(java.util.List)
+ */
+public interface PreemptionResourceRequest {
+
+ /**
+ * @return Resource described in this request, to be matched against running
+ * containers.
+ */
+ @Public
+ @Evolving
+ public ResourceRequest getResourceRequest();
+
+ @Private
+ @Unstable
+ public void setResourceRequest(ResourceRequest req);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StrictPreemptionContract.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StrictPreemptionContract.java
new file mode 100644
index 00000000000..11d7bb9f68b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/StrictPreemptionContract.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+/**
+ * Enumeration of particular allocations to be reclaimed. The platform will
+ * reclaim exactly these resources, so the ApplicationMaster
(AM)
+ * may attempt to checkpoint work or adjust its execution plan to accommodate
+ * it. In contrast to {@link PreemptionContract}, the AM has no flexibility in
+ * selecting which resources to return to the cluster.
+ * @see PreemptionMessage
+ */
+@Public
+@Evolving
+public interface StrictPreemptionContract {
+
+ /**
+ * Get the set of {@link PreemptionContainer} specifying containers owned by
+ * the ApplicationMaster
that may be reclaimed by the
+ * ResourceManager
.
+ * @return the set of {@link ContainerId} to be preempted.
+ */
+ @Public
+ @Evolving
+ public Set getContainers();
+
+ @Private
+ @Unstable
+ public void setContainers(Set containers);
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index 4643e4ed02e..dac8c73580d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -39,7 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
-
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
public class AllocateResponsePBImpl extends ProtoBase
@@ -54,6 +55,7 @@ public class AllocateResponsePBImpl extends ProtoBase
private List completedContainersStatuses = null;
private List updatedNodes = null;
+ private PreemptionMessage preempt;
public AllocateResponsePBImpl() {
@@ -94,6 +96,9 @@ public class AllocateResponsePBImpl extends ProtoBase
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
+ if (this.preempt != null) {
+ builder.setPreempt(convertToProtoFormat(this.preempt));
+ }
}
private synchronized void mergeLocalToProto() {
@@ -217,6 +222,28 @@ public class AllocateResponsePBImpl extends ProtoBase
builder.setNumClusterNodes(numNodes);
}
+ @Override
+ public synchronized PreemptionMessage getPreemptionMessage() {
+ AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+ if (this.preempt != null) {
+ return this.preempt;
+ }
+ if (!p.hasPreempt()) {
+ return null;
+ }
+ this.preempt = convertFromProtoFormat(p.getPreempt());
+ return this.preempt;
+ }
+
+ @Override
+ public synchronized void setPreemptionMessage(PreemptionMessage preempt) {
+ maybeInitBuilder();
+ if (null == preempt) {
+ builder.clearPreempt();
+ }
+ this.preempt = preempt;
+ }
+
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
@@ -393,4 +420,11 @@ public class AllocateResponsePBImpl extends ProtoBase
return ((ResourcePBImpl) r).getProto();
}
+ private synchronized PreemptionMessagePBImpl convertFromProtoFormat(PreemptionMessageProto p) {
+ return new PreemptionMessagePBImpl(p);
+ }
+
+ private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) {
+ return ((PreemptionMessagePBImpl)r).getProto();
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContainerPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContainerPBImpl.java
new file mode 100644
index 00000000000..624d1270f4b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContainerPBImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProtoOrBuilder;
+
+public class PreemptionContainerPBImpl implements PreemptionContainer {
+
+ PreemptionContainerProto proto =
+ PreemptionContainerProto.getDefaultInstance();
+ PreemptionContainerProto.Builder builder = null;
+
+ boolean viaProto = false;
+ private ContainerId id;
+
+ public PreemptionContainerPBImpl() {
+ builder = PreemptionContainerProto.newBuilder();
+ }
+
+ public PreemptionContainerPBImpl(PreemptionContainerProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized PreemptionContainerProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (id != null) {
+ builder.setId(convertToProtoFormat(id));
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = PreemptionContainerProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized ContainerId getId() {
+ PreemptionContainerProtoOrBuilder p = viaProto ? proto : builder;
+ if (id != null) {
+ return id;
+ }
+ if (!p.hasId()) {
+ return null;
+ }
+ id = convertFromProtoFormat(p.getId());
+ return id;
+ }
+
+ @Override
+ public synchronized void setId(final ContainerId id) {
+ maybeInitBuilder();
+ if (null == id) {
+ builder.clearId();
+ }
+ this.id = id;
+ }
+
+ private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
+ return new ContainerIdPBImpl(p);
+ }
+
+ private ContainerIdProto convertToProtoFormat(ContainerId t) {
+ return ((ContainerIdPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContractPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContractPBImpl.java
new file mode 100644
index 00000000000..61534365ca0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionContractPBImpl.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.PreemptionResourceRequestPBImpl;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
+
+public class PreemptionContractPBImpl implements PreemptionContract {
+
+ PreemptionContractProto proto = PreemptionContractProto.getDefaultInstance();
+ PreemptionContractProto.Builder builder = null;
+
+ boolean viaProto = false;
+ private Set containers;
+ private List resources;
+
+ public PreemptionContractPBImpl() {
+ builder = PreemptionContractProto.newBuilder();
+ }
+
+ public PreemptionContractPBImpl(PreemptionContractProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized PreemptionContractProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.resources != null) {
+ addResourcesToProto();
+ }
+ if (this.containers != null) {
+ addContainersToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = PreemptionContractProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized Set getContainers() {
+ initPreemptionContainers();
+ return containers;
+ }
+
+ @Override
+ public synchronized void setContainers(
+ final Set containers) {
+ if (null == containers) {
+ builder.clearContainer();
+ }
+ this.containers = containers;
+ }
+
+ @Override
+ public synchronized List getResourceRequest() {
+ initPreemptionResourceRequests();
+ return resources;
+ }
+
+ @Override
+ public synchronized void setResourceRequest(
+ final List req) {
+ if (null == resources) {
+ builder.clearResource();
+ }
+ this.resources = req;
+ }
+
+ private void initPreemptionResourceRequests() {
+ if (resources != null) {
+ return;
+ }
+ PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getResourceList();
+ resources = new ArrayList();
+
+ for (PreemptionResourceRequestProto rr : list) {
+ resources.add(convertFromProtoFormat(rr));
+ }
+ }
+
+ private void addResourcesToProto() {
+ maybeInitBuilder();
+ builder.clearResource();
+ if (null == resources) {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = resources.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public PreemptionResourceRequestProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllResource(iterable);
+ }
+
+ private void initPreemptionContainers() {
+ if (containers != null) {
+ return;
+ }
+ PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getContainerList();
+ containers = new HashSet();
+
+ for (PreemptionContainerProto c : list) {
+ containers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addContainersToProto() {
+ maybeInitBuilder();
+ builder.clearContainer();
+ if (null == containers) {
+ return;
+ }
+ Iterable iterable =
+ new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = containers.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public PreemptionContainerProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllContainer(iterable);
+ }
+
+ private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
+ return new PreemptionContainerPBImpl(p);
+ }
+
+ private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
+ return ((PreemptionContainerPBImpl)t).getProto();
+ }
+
+ private PreemptionResourceRequestPBImpl convertFromProtoFormat(PreemptionResourceRequestProto p) {
+ return new PreemptionResourceRequestPBImpl(p);
+ }
+
+ private PreemptionResourceRequestProto convertToProtoFormat(PreemptionResourceRequest t) {
+ return ((PreemptionResourceRequestPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionMessagePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionMessagePBImpl.java
new file mode 100644
index 00000000000..72a7eb151ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionMessagePBImpl.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
+
+public class PreemptionMessagePBImpl implements PreemptionMessage {
+
+ PreemptionMessageProto proto = PreemptionMessageProto.getDefaultInstance();
+ PreemptionMessageProto.Builder builder = null;
+
+ boolean viaProto = false;
+ private StrictPreemptionContract strict;
+ private PreemptionContract contract;
+
+ public PreemptionMessagePBImpl() {
+ builder = PreemptionMessageProto.newBuilder();
+ }
+
+ public PreemptionMessagePBImpl(PreemptionMessageProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized PreemptionMessageProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (strict != null) {
+ builder.setStrictContract(convertToProtoFormat(strict));
+ }
+ if (contract != null) {
+ builder.setContract(convertToProtoFormat(contract));
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = PreemptionMessageProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized StrictPreemptionContract getStrictContract() {
+ PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
+ if (strict != null) {
+ return strict;
+ }
+ if (!p.hasStrictContract()) {
+ return null;
+ }
+ strict = convertFromProtoFormat(p.getStrictContract());
+ return strict;
+ }
+
+ @Override
+ public synchronized void setStrictContract(StrictPreemptionContract strict) {
+ maybeInitBuilder();
+ if (null == strict) {
+ builder.clearStrictContract();
+ }
+ this.strict = strict;
+ }
+
+ @Override
+ public synchronized PreemptionContract getContract() {
+ PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
+ if (contract != null) {
+ return contract;
+ }
+ if (!p.hasContract()) {
+ return null;
+ }
+ contract = convertFromProtoFormat(p.getContract());
+ return contract;
+ }
+
+ @Override
+ public synchronized void setContract(final PreemptionContract c) {
+ maybeInitBuilder();
+ if (null == c) {
+ builder.clearContract();
+ }
+ this.contract = c;
+ }
+
+ private StrictPreemptionContractPBImpl convertFromProtoFormat(
+ StrictPreemptionContractProto p) {
+ return new StrictPreemptionContractPBImpl(p);
+ }
+
+ private StrictPreemptionContractProto convertToProtoFormat(
+ StrictPreemptionContract t) {
+ return ((StrictPreemptionContractPBImpl)t).getProto();
+ }
+
+ private PreemptionContractPBImpl convertFromProtoFormat(
+ PreemptionContractProto p) {
+ return new PreemptionContractPBImpl(p);
+ }
+
+ private PreemptionContractProto convertToProtoFormat(
+ PreemptionContract t) {
+ return ((PreemptionContractPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionResourceRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionResourceRequestPBImpl.java
new file mode 100644
index 00000000000..8b6ca2d4f60
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/PreemptionResourceRequestPBImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProtoOrBuilder;
+
+public class PreemptionResourceRequestPBImpl implements PreemptionResourceRequest {
+
+ PreemptionResourceRequestProto proto =
+ PreemptionResourceRequestProto.getDefaultInstance();
+ PreemptionResourceRequestProto.Builder builder = null;
+
+ boolean viaProto = false;
+ private ResourceRequest rr;
+
+ public PreemptionResourceRequestPBImpl() {
+ builder = PreemptionResourceRequestProto.newBuilder();
+ }
+
+ public PreemptionResourceRequestPBImpl(PreemptionResourceRequestProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized PreemptionResourceRequestProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (rr != null) {
+ builder.setResource(convertToProtoFormat(rr));
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = PreemptionResourceRequestProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized ResourceRequest getResourceRequest() {
+ PreemptionResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
+ if (rr != null) {
+ return rr;
+ }
+ if (!p.hasResource()) {
+ return null;
+ }
+ rr = convertFromProtoFormat(p.getResource());
+ return rr;
+ }
+
+ @Override
+ public synchronized void setResourceRequest(final ResourceRequest rr) {
+ maybeInitBuilder();
+ if (null == rr) {
+ builder.clearResource();
+ }
+ this.rr = rr;
+ }
+
+ private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
+ return new ResourceRequestPBImpl(p);
+ }
+
+ private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
+ return ((ResourceRequestPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StrictPreemptionContractPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StrictPreemptionContractPBImpl.java
new file mode 100644
index 00000000000..7759ba22c2e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/StrictPreemptionContractPBImpl.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
+import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProtoOrBuilder;
+
+public class StrictPreemptionContractPBImpl implements StrictPreemptionContract {
+
+ StrictPreemptionContractProto proto =
+ StrictPreemptionContractProto.getDefaultInstance();
+ StrictPreemptionContractProto.Builder builder = null;
+
+ boolean viaProto = false;
+ private Set containers;
+
+ public StrictPreemptionContractPBImpl() {
+ builder = StrictPreemptionContractProto.newBuilder();
+ }
+
+ public StrictPreemptionContractPBImpl(StrictPreemptionContractProto proto) {
+ this.proto = proto;
+ viaProto = true;
+ }
+
+ public synchronized StrictPreemptionContractProto getProto() {
+ mergeLocalToProto();
+ proto = viaProto ? proto : builder.build();
+ viaProto = true;
+ return proto;
+ }
+
+ private void mergeLocalToProto() {
+ if (viaProto)
+ maybeInitBuilder();
+ mergeLocalToBuilder();
+ proto = builder.build();
+ viaProto = true;
+ }
+
+ private void mergeLocalToBuilder() {
+ if (this.containers != null) {
+ addContainersToProto();
+ }
+ }
+
+ private void maybeInitBuilder() {
+ if (viaProto || builder == null) {
+ builder = StrictPreemptionContractProto.newBuilder(proto);
+ }
+ viaProto = false;
+ }
+
+ @Override
+ public synchronized Set getContainers() {
+ initIds();
+ return containers;
+ }
+
+ @Override
+ public synchronized void setContainers(
+ final Set containers) {
+ if (null == containers) {
+ builder.clearContainer();
+ }
+ this.containers = containers;
+ }
+
+ private void initIds() {
+ if (containers != null) {
+ return;
+ }
+ StrictPreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
+ List list = p.getContainerList();
+ containers = new HashSet();
+
+ for (PreemptionContainerProto c : list) {
+ containers.add(convertFromProtoFormat(c));
+ }
+ }
+
+ private void addContainersToProto() {
+ maybeInitBuilder();
+ builder.clearContainer();
+ if (containers == null) {
+ return;
+ }
+ Iterable iterable = new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return new Iterator() {
+
+ Iterator iter = containers.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public PreemptionContainerProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllContainer(iterable);
+ }
+
+ private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
+ return new PreemptionContainerPBImpl(p);
+ }
+
+ private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
+ return ((PreemptionContainerPBImpl)t).getProto();
+ }
+
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index ad3b5f18072..6ac02741bac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -66,9 +66,30 @@ message AllocateResponseProto {
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
+ optional PreemptionMessageProto preempt = 8;
}
+message PreemptionMessageProto {
+ optional StrictPreemptionContractProto strictContract = 1;
+ optional PreemptionContractProto contract = 2;
+}
+message StrictPreemptionContractProto {
+ repeated PreemptionContainerProto container = 1;
+}
+
+message PreemptionContractProto {
+ repeated PreemptionResourceRequestProto resource = 1;
+ repeated PreemptionContainerProto container = 2;
+}
+
+message PreemptionContainerProto {
+ optional ContainerIdProto id = 1;
+}
+
+message PreemptionResourceRequestProto {
+ optional ResourceRequestProto resource = 1;
+}
//////////////////////////////////////////////////////
/////// client_RM_Protocol ///////////////////////////
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
index d95ce64f630..ff2c0a441a9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
@@ -113,7 +113,7 @@ public class TestAMRMClientAsync {
private AllocateResponse createAllocateResponse(
List completed, List allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
- new ArrayList(), null, false, 1);
+ new ArrayList(), null, false, 1, null);
return response;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
index f09046e3712..e6699f39278 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/BuilderUtils.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -404,7 +405,8 @@ public class BuilderUtils {
public static AllocateResponse newAllocateResponse(int responseId,
List completedContainers,
List allocatedContainers, List updatedNodes,
- Resource availResources, boolean reboot, int numClusterNodes) {
+ Resource availResources, boolean reboot, int numClusterNodes,
+ PreemptionMessage preempt) {
AllocateResponse response = recordFactory
.newRecordInstance(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
@@ -414,6 +416,7 @@ public class BuilderUtils {
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setReboot(reboot);
+ response.setPreemptionMessage(preempt);
return response;
}