YARN-45. Add protocol for schedulers to request containers back from

ApplicationMasters. Contributed by Carlo Curino and Chris Douglas.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1479773 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Christopher Douglas 2013-05-07 06:18:25 +00:00
parent 2136542f1b
commit e8585afa03
16 changed files with 1112 additions and 3 deletions

View File

@ -45,6 +45,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-482. FS: Extend SchedulingMode to intermediate queues.
(kkambatl via tucu)
YARN-45. Add protocol for schedulers to request containers back from
ApplicationMasters. (Carlo Curino, cdouglas)
IMPROVEMENTS
YARN-365. Change NM heartbeat handling to not generate a scheduler event

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.AMRMProtocol;
@ -48,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
* </li>
* <li>A list of nodes whose status has been updated.</li>
* <li>The number of available nodes in a cluster.</li>
* <li>A description of resources requested back by the cluster</li>
* </ul>
* </p>
*
@ -152,4 +154,27 @@ public interface AllocateResponse {
@Private
@Unstable
public void setNumClusterNodes(int numNodes);
/**
* Get the description of containers owned by the AM, but requested back by
* the cluster. Note that the RM may have an inconsistent view of the
* resources owned by the AM. These messages are advisory, and the AM may
* elect to ignore them.
*
* The message is a snapshot of the resources the RM wants back from the AM.
* While demand persists, the RM will repeat its request; applications should
* not interpret each message as a request for <emph>additional<emph>
* resources on top of previous messages. Resources requested consistently
* over some duration may be forcibly killed by the RM.
*
* @return A specification of the resources to reclaim from this AM.
*/
@Public
@Evolving
public PreemptionMessage getPreemptionMessage();
@Private
@Unstable
public void setPreemptionMessage(PreemptionMessage request);
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Specific container requested back by the <code>ResourceManager</code>.
* @see PreemptionContract
* @see StrictPreemptionContract
*/
public interface PreemptionContainer {
/**
* @return Container referenced by this handle.
*/
@Public
@Evolving
public ContainerId getId();
@Private
@Unstable
public void setId(ContainerId id);
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* Description of resources requested back by the <code>ResourceManager</code>.
* The <code>ApplicationMaster</code> (AM) can satisfy this request according
* to its own priorities to prevent containers from being forcibly killed by
* the platform.
* @see PreemptionMessage
*/
public interface PreemptionContract {
/**
* If the AM releases resources matching these requests, then the {@link
* PreemptionContainer}s enumerated in {@link #getContainers()} should not be
* evicted from the cluster. Due to delays in propagating cluster state and
* sending these messages, there are conditions where satisfied contracts may
* not prevent the platform from killing containers.
* @return List of {@link PreemptionResourceRequest} to update the
* <code>ApplicationMaster</code> about resources requested back by the
* <code>ResourceManager</code>.
* @see AllocateRequest#setAskList(List)
*/
@Public
@Evolving
public List<PreemptionResourceRequest> getResourceRequest();
@Private
@Unstable
public void setResourceRequest(List<PreemptionResourceRequest> req);
/**
* Assign the set of {@link PreemptionContainer} specifying which containers
* owned by the <code>ApplicationMaster</code> that may be reclaimed by the
* <code>ResourceManager</code>. If the AM prefers a different set of
* containers, then it may checkpoint or kill containers matching the
* description in {@link #getResourceRequest}.
* @return Set of containers at risk if the contract is not met.
*/
@Public
@Evolving
public Set<PreemptionContainer> getContainers();
@Private
@Unstable
public void setContainers(Set<PreemptionContainer> containers);
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
* A {@link PreemptionMessage} is part of the RM-AM protocol, and it is used by
* the RM to specify resources that the RM wants to reclaim from this
* <code>ApplicationMaster</code> (AM). The AM receives a {@link
* StrictPreemptionContract} message encoding which containers the platform may
* forcibly kill, granting it an opportunity to checkpoint state or adjust its
* execution plan. The message may also include a {@link PreemptionContract}
* granting the AM more latitude in selecting which resources to return to the
* cluster.
*
* The AM should decode both parts of the message. The {@link
* StrictPreemptionContract} specifies particular allocations that the RM
* requires back. The AM can checkpoint containers' state, adjust its execution
* plan to move the computation, or take no action and hope that conditions that
* caused the RM to ask for the container will change.
*
* In contrast, the {@link PreemptionContract} also includes a description of
* resources with a set of containers. If the AM releases containers matching
* that profile, then the containers enumerated in {@link
* PreemptionContract#getContainers()} may not be killed.
*
* Each preemption message reflects the RM's current understanding of the
* cluster state, so a request to return <emph>N</emph> containers may not
* reflect containers the AM is releasing, recently exited containers the RM has
* yet to learn about, or new containers allocated before the message was
* generated. Conversely, an RM may request a different profile of containers in
* subsequent requests.
*
* The policy enforced by the RM is part of the scheduler. Generally, only
* containers that have been requested consistently should be killed, but the
* details are not specified.
*/
@Public
@Evolving
public interface PreemptionMessage {
/**
* @return Specific resources that may be killed by the
* <code>ResourceManager</code>
*/
@Public
@Evolving
public StrictPreemptionContract getStrictContract();
@Private
@Unstable
public void setStrictContract(StrictPreemptionContract set);
/**
* @return Contract describing resources to return to the cluster.
*/
@Public
@Evolving
public PreemptionContract getContract();
@Private
@Unstable
public void setContract(PreemptionContract contract);
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
/**
* Description of resources requested back by the cluster.
* @see PreemptionContract
* @see AllocateRequest#setAskList(java.util.List)
*/
public interface PreemptionResourceRequest {
/**
* @return Resource described in this request, to be matched against running
* containers.
*/
@Public
@Evolving
public ResourceRequest getResourceRequest();
@Private
@Unstable
public void setResourceRequest(ResourceRequest req);
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
/**
* Enumeration of particular allocations to be reclaimed. The platform will
* reclaim exactly these resources, so the <code>ApplicationMaster</code> (AM)
* may attempt to checkpoint work or adjust its execution plan to accommodate
* it. In contrast to {@link PreemptionContract}, the AM has no flexibility in
* selecting which resources to return to the cluster.
* @see PreemptionMessage
*/
@Public
@Evolving
public interface StrictPreemptionContract {
/**
* Get the set of {@link PreemptionContainer} specifying containers owned by
* the <code>ApplicationMaster</code> that may be reclaimed by the
* <code>ResourceManager</code>.
* @return the set of {@link ContainerId} to be preempted.
*/
@Public
@Evolving
public Set<PreemptionContainer> getContainers();
@Private
@Unstable
public void setContainers(Set<PreemptionContainer> containers);
}

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@ -39,7 +40,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
@ -54,6 +55,7 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
private List<ContainerStatus> completedContainersStatuses = null;
private List<NodeReport> updatedNodes = null;
private PreemptionMessage preempt;
public AllocateResponsePBImpl() {
@ -94,6 +96,9 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
if (this.limit != null) {
builder.setLimit(convertToProtoFormat(this.limit));
}
if (this.preempt != null) {
builder.setPreempt(convertToProtoFormat(this.preempt));
}
}
private synchronized void mergeLocalToProto() {
@ -217,6 +222,28 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
builder.setNumClusterNodes(numNodes);
}
@Override
public synchronized PreemptionMessage getPreemptionMessage() {
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.preempt != null) {
return this.preempt;
}
if (!p.hasPreempt()) {
return null;
}
this.preempt = convertFromProtoFormat(p.getPreempt());
return this.preempt;
}
@Override
public synchronized void setPreemptionMessage(PreemptionMessage preempt) {
maybeInitBuilder();
if (null == preempt) {
builder.clearPreempt();
}
this.preempt = preempt;
}
// Once this is called. updatedNodes will never be null - until a getProto is
// called.
private synchronized void initLocalNewNodeReportList() {
@ -393,4 +420,11 @@ public class AllocateResponsePBImpl extends ProtoBase<AllocateResponseProto>
return ((ResourcePBImpl) r).getProto();
}
private synchronized PreemptionMessagePBImpl convertFromProtoFormat(PreemptionMessageProto p) {
return new PreemptionMessagePBImpl(p);
}
private synchronized PreemptionMessageProto convertToProtoFormat(PreemptionMessage r) {
return ((PreemptionMessagePBImpl)r).getProto();
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProtoOrBuilder;
public class PreemptionContainerPBImpl implements PreemptionContainer {
PreemptionContainerProto proto =
PreemptionContainerProto.getDefaultInstance();
PreemptionContainerProto.Builder builder = null;
boolean viaProto = false;
private ContainerId id;
public PreemptionContainerPBImpl() {
builder = PreemptionContainerProto.newBuilder();
}
public PreemptionContainerPBImpl(PreemptionContainerProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionContainerProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (id != null) {
builder.setId(convertToProtoFormat(id));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionContainerProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ContainerId getId() {
PreemptionContainerProtoOrBuilder p = viaProto ? proto : builder;
if (id != null) {
return id;
}
if (!p.hasId()) {
return null;
}
id = convertFromProtoFormat(p.getId());
return id;
}
@Override
public synchronized void setId(final ContainerId id) {
maybeInitBuilder();
if (null == id) {
builder.clearId();
}
this.id = id;
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.PreemptionResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
public class PreemptionContractPBImpl implements PreemptionContract {
PreemptionContractProto proto = PreemptionContractProto.getDefaultInstance();
PreemptionContractProto.Builder builder = null;
boolean viaProto = false;
private Set<PreemptionContainer> containers;
private List<PreemptionResourceRequest> resources;
public PreemptionContractPBImpl() {
builder = PreemptionContractProto.newBuilder();
}
public PreemptionContractPBImpl(PreemptionContractProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionContractProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.resources != null) {
addResourcesToProto();
}
if (this.containers != null) {
addContainersToProto();
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionContractProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized Set<PreemptionContainer> getContainers() {
initPreemptionContainers();
return containers;
}
@Override
public synchronized void setContainers(
final Set<PreemptionContainer> containers) {
if (null == containers) {
builder.clearContainer();
}
this.containers = containers;
}
@Override
public synchronized List<PreemptionResourceRequest> getResourceRequest() {
initPreemptionResourceRequests();
return resources;
}
@Override
public synchronized void setResourceRequest(
final List<PreemptionResourceRequest> req) {
if (null == resources) {
builder.clearResource();
}
this.resources = req;
}
private void initPreemptionResourceRequests() {
if (resources != null) {
return;
}
PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionResourceRequestProto> list = p.getResourceList();
resources = new ArrayList<PreemptionResourceRequest>();
for (PreemptionResourceRequestProto rr : list) {
resources.add(convertFromProtoFormat(rr));
}
}
private void addResourcesToProto() {
maybeInitBuilder();
builder.clearResource();
if (null == resources) {
return;
}
Iterable<PreemptionResourceRequestProto> iterable =
new Iterable<PreemptionResourceRequestProto>() {
@Override
public Iterator<PreemptionResourceRequestProto> iterator() {
return new Iterator<PreemptionResourceRequestProto>() {
Iterator<PreemptionResourceRequest> iter = resources.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionResourceRequestProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllResource(iterable);
}
private void initPreemptionContainers() {
if (containers != null) {
return;
}
PreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionContainerProto> list = p.getContainerList();
containers = new HashSet<PreemptionContainer>();
for (PreemptionContainerProto c : list) {
containers.add(convertFromProtoFormat(c));
}
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainer();
if (null == containers) {
return;
}
Iterable<PreemptionContainerProto> iterable =
new Iterable<PreemptionContainerProto>() {
@Override
public Iterator<PreemptionContainerProto> iterator() {
return new Iterator<PreemptionContainerProto>() {
Iterator<PreemptionContainer> iter = containers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainer(iterable);
}
private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
return new PreemptionContainerPBImpl(p);
}
private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
return ((PreemptionContainerPBImpl)t).getProto();
}
private PreemptionResourceRequestPBImpl convertFromProtoFormat(PreemptionResourceRequestProto p) {
return new PreemptionResourceRequestPBImpl(p);
}
private PreemptionResourceRequestProto convertToProtoFormat(PreemptionResourceRequest t) {
return ((PreemptionResourceRequestPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContract;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionMessageProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
public class PreemptionMessagePBImpl implements PreemptionMessage {
PreemptionMessageProto proto = PreemptionMessageProto.getDefaultInstance();
PreemptionMessageProto.Builder builder = null;
boolean viaProto = false;
private StrictPreemptionContract strict;
private PreemptionContract contract;
public PreemptionMessagePBImpl() {
builder = PreemptionMessageProto.newBuilder();
}
public PreemptionMessagePBImpl(PreemptionMessageProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionMessageProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (strict != null) {
builder.setStrictContract(convertToProtoFormat(strict));
}
if (contract != null) {
builder.setContract(convertToProtoFormat(contract));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionMessageProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized StrictPreemptionContract getStrictContract() {
PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
if (strict != null) {
return strict;
}
if (!p.hasStrictContract()) {
return null;
}
strict = convertFromProtoFormat(p.getStrictContract());
return strict;
}
@Override
public synchronized void setStrictContract(StrictPreemptionContract strict) {
maybeInitBuilder();
if (null == strict) {
builder.clearStrictContract();
}
this.strict = strict;
}
@Override
public synchronized PreemptionContract getContract() {
PreemptionMessageProtoOrBuilder p = viaProto ? proto : builder;
if (contract != null) {
return contract;
}
if (!p.hasContract()) {
return null;
}
contract = convertFromProtoFormat(p.getContract());
return contract;
}
@Override
public synchronized void setContract(final PreemptionContract c) {
maybeInitBuilder();
if (null == c) {
builder.clearContract();
}
this.contract = c;
}
private StrictPreemptionContractPBImpl convertFromProtoFormat(
StrictPreemptionContractProto p) {
return new StrictPreemptionContractPBImpl(p);
}
private StrictPreemptionContractProto convertToProtoFormat(
StrictPreemptionContract t) {
return ((StrictPreemptionContractPBImpl)t).getProto();
}
private PreemptionContractPBImpl convertFromProtoFormat(
PreemptionContractProto p) {
return new PreemptionContractPBImpl(p);
}
private PreemptionContractProto convertToProtoFormat(
PreemptionContract t) {
return ((PreemptionContractPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionResourceRequestProtoOrBuilder;
public class PreemptionResourceRequestPBImpl implements PreemptionResourceRequest {
PreemptionResourceRequestProto proto =
PreemptionResourceRequestProto.getDefaultInstance();
PreemptionResourceRequestProto.Builder builder = null;
boolean viaProto = false;
private ResourceRequest rr;
public PreemptionResourceRequestPBImpl() {
builder = PreemptionResourceRequestProto.newBuilder();
}
public PreemptionResourceRequestPBImpl(PreemptionResourceRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized PreemptionResourceRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (rr != null) {
builder.setResource(convertToProtoFormat(rr));
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = PreemptionResourceRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized ResourceRequest getResourceRequest() {
PreemptionResourceRequestProtoOrBuilder p = viaProto ? proto : builder;
if (rr != null) {
return rr;
}
if (!p.hasResource()) {
return null;
}
rr = convertFromProtoFormat(p.getResource());
return rr;
}
@Override
public synchronized void setResourceRequest(final ResourceRequest rr) {
maybeInitBuilder();
if (null == rr) {
builder.clearResource();
}
this.rr = rr;
}
private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) {
return new ResourceRequestPBImpl(p);
}
private ResourceRequestProto convertToProtoFormat(ResourceRequest t) {
return ((ResourceRequestPBImpl)t).getProto();
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.protocolrecords.impl.pb;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionContainer;
import org.apache.hadoop.yarn.api.protocolrecords.StrictPreemptionContract;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.PreemptionContainerProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StrictPreemptionContractProtoOrBuilder;
public class StrictPreemptionContractPBImpl implements StrictPreemptionContract {
StrictPreemptionContractProto proto =
StrictPreemptionContractProto.getDefaultInstance();
StrictPreemptionContractProto.Builder builder = null;
boolean viaProto = false;
private Set<PreemptionContainer> containers;
public StrictPreemptionContractPBImpl() {
builder = StrictPreemptionContractProto.newBuilder();
}
public StrictPreemptionContractPBImpl(StrictPreemptionContractProto proto) {
this.proto = proto;
viaProto = true;
}
public synchronized StrictPreemptionContractProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void mergeLocalToBuilder() {
if (this.containers != null) {
addContainersToProto();
}
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = StrictPreemptionContractProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public synchronized Set<PreemptionContainer> getContainers() {
initIds();
return containers;
}
@Override
public synchronized void setContainers(
final Set<PreemptionContainer> containers) {
if (null == containers) {
builder.clearContainer();
}
this.containers = containers;
}
private void initIds() {
if (containers != null) {
return;
}
StrictPreemptionContractProtoOrBuilder p = viaProto ? proto : builder;
List<PreemptionContainerProto> list = p.getContainerList();
containers = new HashSet<PreemptionContainer>();
for (PreemptionContainerProto c : list) {
containers.add(convertFromProtoFormat(c));
}
}
private void addContainersToProto() {
maybeInitBuilder();
builder.clearContainer();
if (containers == null) {
return;
}
Iterable<PreemptionContainerProto> iterable = new Iterable<PreemptionContainerProto>() {
@Override
public Iterator<PreemptionContainerProto> iterator() {
return new Iterator<PreemptionContainerProto>() {
Iterator<PreemptionContainer> iter = containers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public PreemptionContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainer(iterable);
}
private PreemptionContainerPBImpl convertFromProtoFormat(PreemptionContainerProto p) {
return new PreemptionContainerPBImpl(p);
}
private PreemptionContainerProto convertToProtoFormat(PreemptionContainer t) {
return ((PreemptionContainerPBImpl)t).getProto();
}
}

View File

@ -66,9 +66,30 @@ message AllocateResponseProto {
optional ResourceProto limit = 5;
repeated NodeReportProto updated_nodes = 6;
optional int32 num_cluster_nodes = 7;
optional PreemptionMessageProto preempt = 8;
}
message PreemptionMessageProto {
optional StrictPreemptionContractProto strictContract = 1;
optional PreemptionContractProto contract = 2;
}
message StrictPreemptionContractProto {
repeated PreemptionContainerProto container = 1;
}
message PreemptionContractProto {
repeated PreemptionResourceRequestProto resource = 1;
repeated PreemptionContainerProto container = 2;
}
message PreemptionContainerProto {
optional ContainerIdProto id = 1;
}
message PreemptionResourceRequestProto {
optional ResourceRequestProto resource = 1;
}
//////////////////////////////////////////////////////
/////// client_RM_Protocol ///////////////////////////

View File

@ -113,7 +113,7 @@ public class TestAMRMClientAsync {
private AllocateResponse createAllocateResponse(
List<ContainerStatus> completed, List<Container> allocated) {
AllocateResponse response = BuilderUtils.newAllocateResponse(0, completed, allocated,
new ArrayList<NodeReport>(), null, false, 1);
new ArrayList<NodeReport>(), null, false, 1, null);
return response;
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -404,7 +405,8 @@ public class BuilderUtils {
public static AllocateResponse newAllocateResponse(int responseId,
List<ContainerStatus> completedContainers,
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
Resource availResources, boolean reboot, int numClusterNodes) {
Resource availResources, boolean reboot, int numClusterNodes,
PreemptionMessage preempt) {
AllocateResponse response = recordFactory
.newRecordInstance(AllocateResponse.class);
response.setNumClusterNodes(numClusterNodes);
@ -414,6 +416,7 @@ public class BuilderUtils {
response.setUpdatedNodes(updatedNodes);
response.setAvailableResources(availResources);
response.setReboot(reboot);
response.setPreemptionMessage(preempt);
return response;
}