From 53dc83264137e80d3a2a5459e433cd6a494fa794 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Mon, 9 Dec 2013 17:48:00 +0000 Subject: [PATCH] YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan via Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1549629 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../api/protocolrecords/AllocateRequest.java | 31 ++++ .../api/protocolrecords/AllocateResponse.java | 49 +++++ .../src/main/proto/yarn_service_protos.proto | 3 + .../impl/pb/AllocateRequestPBImpl.java | 90 ++++++++- .../impl/pb/AllocateResponsePBImpl.java | 171 +++++++++++++++++- .../hadoop/yarn/api/TestAllocateRequest.java | 73 ++++++++ .../hadoop/yarn/api/TestAllocateResponse.java | 114 ++++++++++++ 8 files changed, 525 insertions(+), 9 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 590f9dd9d1b..e9cb231231b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -28,6 +28,9 @@ Release 2.4.0 - UNRELEASED YARN-1447. Common PB type definitions for container resizing. (Wangda Tan via Sandy Ryza) + YARN-1448. AM-RM protocol changes to support container resizing (Wangda Tan + via Sandy Ryza) + IMPROVEMENTS YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java index 9ae4a12f965..62316a64b69 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateRequest.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.util.Records; @@ -60,12 +61,24 @@ public abstract class AllocateRequest { List resourceAsk, List containersToBeReleased, ResourceBlacklistRequest resourceBlacklistRequest) { + return newInstance(responseID, appProgress, resourceAsk, + containersToBeReleased, resourceBlacklistRequest, null); + } + + @Public + @Stable + public static AllocateRequest newInstance(int responseID, float appProgress, + List resourceAsk, + List containersToBeReleased, + ResourceBlacklistRequest resourceBlacklistRequest, + List increaseRequests) { AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class); allocateRequest.setResponseId(responseID); allocateRequest.setProgress(appProgress); allocateRequest.setAskList(resourceAsk); allocateRequest.setReleaseList(containersToBeReleased); allocateRequest.setResourceBlacklistRequest(resourceBlacklistRequest); + allocateRequest.setIncreaseRequests(increaseRequests); return allocateRequest; } @@ -170,4 +183,22 @@ public abstract class AllocateRequest { @Stable public abstract void setResourceBlacklistRequest( ResourceBlacklistRequest resourceBlacklistRequest); + + /** + * Get the ContainerResourceIncreaseRequest being sent by the + * ApplicationMaster + */ + @Public + @Stable + public abstract List getIncreaseRequests(); + + /** + * Set the ContainerResourceIncreaseRequest to inform the + * ResourceManager about some container's resources need to be + * increased + */ + @Public + @Stable + public abstract void setIncreaseRequests( + List increaseRequests); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java index 3de74f7c9f5..0e27f3248fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java @@ -28,6 +28,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -82,6 +84,23 @@ public abstract class AllocateResponse { response.setNMTokens(nmTokens); return response; } + + @Public + @Stable + public static AllocateResponse newInstance(int responseId, + List completedContainers, + List allocatedContainers, List updatedNodes, + Resource availResources, AMCommand command, int numClusterNodes, + PreemptionMessage preempt, List nmTokens, + List increasedContainers, + List decreasedContainers) { + AllocateResponse response = newInstance(responseId, completedContainers, + allocatedContainers, updatedNodes, availResources, command, + numClusterNodes, preempt, nmTokens); + response.setIncreasedContainers(increasedContainers); + response.setDecreasedContainers(decreasedContainers); + return response; + } /** * If the ResourceManager needs the @@ -221,4 +240,34 @@ public abstract class AllocateResponse { @Private @Unstable public abstract void setNMTokens(List nmTokens); + + /** + * Get the list of newly increased containers by ResourceManager + */ + @Public + @Stable + public abstract List getIncreasedContainers(); + + /** + * Set the list of newly increased containers by ResourceManager + */ + @Private + @Unstable + public abstract void setIncreasedContainers( + List increasedContainers); + + /** + * Get the list of newly decreased containers by NodeManager + */ + @Public + @Stable + public abstract List getDecreasedContainers(); + + /** + * Set the list of newly decreased containers by NodeManager + */ + @Private + @Unstable + public abstract void setDecreasedContainers( + List decreasedContainers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index cfe71d44c07..332be813627 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -62,6 +62,7 @@ message AllocateRequestProto { optional ResourceBlacklistRequestProto blacklist_request = 3; optional int32 response_id = 4; optional float progress = 5; + repeated ContainerResourceIncreaseRequestProto increase_request = 6; } message NMTokenProto { @@ -79,6 +80,8 @@ message AllocateResponseProto { optional int32 num_cluster_nodes = 7; optional PreemptionMessageProto preempt = 8; repeated NMTokenProto nm_tokens = 9; + repeated ContainerResourceIncreaseProto increased_containers = 10; + repeated ContainerResourceDecreaseProto decreased_containers = 11; } ////////////////////////////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java index bff252f38a6..dc11165f6a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateRequestPBImpl.java @@ -27,12 +27,15 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreaseRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceBlacklistRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourceRequestPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceBlacklistRequestProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; @@ -49,9 +52,9 @@ public class AllocateRequestPBImpl extends AllocateRequest { private List ask = null; private List release = null; + private List increaseRequests = null; private ResourceBlacklistRequest blacklistRequest = null; - public AllocateRequestPBImpl() { builder = AllocateRequestProto.newBuilder(); } @@ -62,7 +65,7 @@ public class AllocateRequestPBImpl extends AllocateRequest { } public AllocateRequestProto getProto() { - mergeLocalToProto(); + mergeLocalToProto(); proto = viaProto ? proto : builder.build(); viaProto = true; return proto; @@ -95,6 +98,9 @@ public class AllocateRequestPBImpl extends AllocateRequest { if (this.release != null) { addReleasesToProto(); } + if (this.increaseRequests != null) { + addIncreaseRequestsToProto(); + } if (this.blacklistRequest != null) { builder.setBlacklistRequest(convertToProtoFormat(this.blacklistRequest)); } @@ -155,6 +161,23 @@ public class AllocateRequestPBImpl extends AllocateRequest { this.ask.addAll(resourceRequests); } + @Override + public List getIncreaseRequests() { + initIncreaseRequests(); + return this.increaseRequests; + } + + @Override + public void setIncreaseRequests( + List increaseRequests) { + if (increaseRequests == null) { + return; + } + initIncreaseRequests(); + this.increaseRequests.clear(); + this.increaseRequests.addAll(increaseRequests); + } + @Override public ResourceBlacklistRequest getResourceBlacklistRequest() { AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; @@ -223,6 +246,57 @@ public class AllocateRequestPBImpl extends AllocateRequest { }; builder.addAllAsk(iterable); } + + private void initIncreaseRequests() { + if (this.increaseRequests != null) { + return; + } + AllocateRequestProtoOrBuilder p = viaProto ? proto : builder; + List list = + p.getIncreaseRequestList(); + this.increaseRequests = new ArrayList(); + + for (ContainerResourceIncreaseRequestProto c : list) { + this.increaseRequests.add(convertFromProtoFormat(c)); + } + } + + private void addIncreaseRequestsToProto() { + maybeInitBuilder(); + builder.clearIncreaseRequest(); + if (increaseRequests == null) { + return; + } + Iterable iterable = + new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + Iterator iter = + increaseRequests.iterator(); + + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public ContainerResourceIncreaseRequestProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + builder.addAllIncreaseRequest(iterable); + } + @Override public List getReleaseList() { initReleases(); @@ -292,6 +366,16 @@ public class AllocateRequestPBImpl extends AllocateRequest { private ResourceRequestProto convertToProtoFormat(ResourceRequest t) { return ((ResourceRequestPBImpl)t).getProto(); } + + private ContainerResourceIncreaseRequestPBImpl convertFromProtoFormat( + ContainerResourceIncreaseRequestProto p) { + return new ContainerResourceIncreaseRequestPBImpl(p); + } + + private ContainerResourceIncreaseRequestProto convertToProtoFormat( + ContainerResourceIncreaseRequest t) { + return ((ContainerResourceIncreaseRequestPBImpl) t).getProto(); + } private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) { return new ContainerIdPBImpl(p); @@ -308,6 +392,4 @@ public class AllocateRequestPBImpl extends AllocateRequest { private ResourceBlacklistRequestProto convertToProtoFormat(ResourceBlacklistRequest t) { return ((ResourceBlacklistRequestPBImpl)t).getProto(); } - - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java index 37d59713670..4d7c0a3439b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java @@ -28,12 +28,16 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.AMCommand; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceDecreasePBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerResourceIncreasePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl; @@ -41,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceDecreaseProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerResourceIncreaseProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto; import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto; @@ -63,6 +69,8 @@ public class AllocateResponsePBImpl extends AllocateResponse { private List allocatedContainers = null; private List nmTokens = null; private List completedContainersStatuses = null; + private List increasedContainers = null; + private List decreasedContainers = null; private List updatedNodes = null; private PreemptionMessage preempt; @@ -108,7 +116,7 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.allocatedContainers != null) { builder.clearAllocatedContainers(); Iterable iterable = - getProtoIterable(this.allocatedContainers); + getContainerProtoIterable(this.allocatedContainers); builder.addAllAllocatedContainers(iterable); } if (nmTokens != null) { @@ -134,6 +142,18 @@ public class AllocateResponsePBImpl extends AllocateResponse { if (this.preempt != null) { builder.setPreempt(convertToProtoFormat(this.preempt)); } + if (this.increasedContainers != null) { + builder.clearIncreasedContainers(); + Iterable iterable = + getIncreaseProtoIterable(this.increasedContainers); + builder.addAllIncreasedContainers(iterable); + } + if (this.decreasedContainers != null) { + builder.clearDecreasedContainers(); + Iterable iterable = + getChangeProtoIterable(this.decreasedContainers); + builder.addAllDecreasedContainers(iterable); + } } private synchronized void mergeLocalToProto() { @@ -306,6 +326,63 @@ public class AllocateResponsePBImpl extends AllocateResponse { this.preempt = preempt; } + @Override + public synchronized List getIncreasedContainers() { + initLocalIncreasedContainerList(); + return increasedContainers; + } + + @Override + public synchronized void setIncreasedContainers( + List increasedContainers) { + if (increasedContainers == null) + return; + initLocalIncreasedContainerList(); + this.increasedContainers.addAll(increasedContainers); + } + + @Override + public synchronized List getDecreasedContainers() { + initLocalDecreasedContainerList(); + return decreasedContainers; + } + + @Override + public synchronized void setDecreasedContainers( + List decreasedContainers) { + if (decreasedContainers == null) { + return; + } + initLocalDecreasedContainerList(); + this.decreasedContainers.addAll(decreasedContainers); + } + + private synchronized void initLocalIncreasedContainerList() { + if (this.increasedContainers != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getIncreasedContainersList(); + increasedContainers = new ArrayList(); + + for (ContainerResourceIncreaseProto c : list) { + increasedContainers.add(convertFromProtoFormat(c)); + } + } + + private synchronized void initLocalDecreasedContainerList() { + if (this.decreasedContainers != null) { + return; + } + AllocateResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getDecreasedContainersList(); + decreasedContainers = new ArrayList(); + + for (ContainerResourceDecreaseProto c : list) { + decreasedContainers.add(convertFromProtoFormat(c)); + } + } + // Once this is called. updatedNodes will never be null - until a getProto is // called. private synchronized void initLocalNewNodeReportList() { @@ -348,7 +425,71 @@ public class AllocateResponsePBImpl extends AllocateResponse { } } - private synchronized Iterable getProtoIterable( + private synchronized Iterable + getIncreaseProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newContainersList + .iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerResourceIncreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + } + + private synchronized Iterable + getChangeProtoIterable( + final List newContainersList) { + maybeInitBuilder(); + return new Iterable() { + @Override + public synchronized Iterator iterator() { + return new Iterator() { + + Iterator iter = newContainersList + .iterator(); + + @Override + public synchronized boolean hasNext() { + return iter.hasNext(); + } + + @Override + public synchronized ContainerResourceDecreaseProto next() { + return convertToProtoFormat(iter.next()); + } + + @Override + public synchronized void remove() { + throw new UnsupportedOperationException(); + } + }; + + } + }; + } + + private synchronized Iterable getContainerProtoIterable( final List newContainersList) { maybeInitBuilder(); return new Iterable() { @@ -467,7 +608,6 @@ public class AllocateResponsePBImpl extends AllocateResponse { } }; - } }; } @@ -486,6 +626,26 @@ public class AllocateResponsePBImpl extends AllocateResponse { completedContainersStatuses.add(convertFromProtoFormat(c)); } } + + private synchronized ContainerResourceIncrease convertFromProtoFormat( + ContainerResourceIncreaseProto p) { + return new ContainerResourceIncreasePBImpl(p); + } + + private synchronized ContainerResourceIncreaseProto convertToProtoFormat( + ContainerResourceIncrease t) { + return ((ContainerResourceIncreasePBImpl) t).getProto(); + } + + private synchronized ContainerResourceDecrease convertFromProtoFormat( + ContainerResourceDecreaseProto p) { + return new ContainerResourceDecreasePBImpl(p); + } + + private synchronized ContainerResourceDecreaseProto convertToProtoFormat( + ContainerResourceDecrease t) { + return ((ContainerResourceDecreasePBImpl) t).getProto(); + } private synchronized NodeReportPBImpl convertFromProtoFormat( NodeReportProto p) { @@ -500,8 +660,9 @@ public class AllocateResponsePBImpl extends AllocateResponse { ContainerProto p) { return new ContainerPBImpl(p); } - - private synchronized ContainerProto convertToProtoFormat(Container t) { + + private synchronized ContainerProto convertToProtoFormat( + Container t) { return ((ContainerPBImpl)t).getProto(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java new file mode 100644 index 00000000000..90ad3f0da81 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateRequest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncreaseRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateRequestProto; +import org.junit.Test; + +public class TestAllocateRequest { + @Test + public void testAllcoateRequestWithIncrease() { + List incRequests = + new ArrayList(); + for (int i = 0; i < 3; i++) { + incRequests.add(ContainerResourceIncreaseRequest.newInstance(null, + Resource.newInstance(0, i))); + } + AllocateRequest r = + AllocateRequest.newInstance(123, 0f, null, null, null, incRequests); + + // serde + AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto(); + r = new AllocateRequestPBImpl(p); + + // check value + Assert.assertEquals(123, r.getResponseId()); + Assert.assertEquals(incRequests.size(), r.getIncreaseRequests().size()); + + for (int i = 0; i < incRequests.size(); i++) { + Assert.assertEquals(r.getIncreaseRequests().get(i).getCapability() + .getVirtualCores(), incRequests.get(i).getCapability() + .getVirtualCores()); + } + } + + @Test + public void testAllcoateRequestWithoutIncrease() { + AllocateRequest r = + AllocateRequest.newInstance(123, 0f, null, null, null, null); + + // serde + AllocateRequestProto p = ((AllocateRequestPBImpl) r).getProto(); + r = new AllocateRequestPBImpl(p); + + // check value + Assert.assertEquals(123, r.getResponseId()); + Assert.assertEquals(0, r.getIncreaseRequests().size()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java new file mode 100644 index 00000000000..06420fb99da --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.util.ArrayList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl; +import org.apache.hadoop.yarn.api.records.AMCommand; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; +import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NMToken; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto; +import org.junit.Test; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +public class TestAllocateResponse { + @Test + public void testAllocateResponseWithIncDecContainers() { + List incContainers = + new ArrayList(); + List decContainers = + new ArrayList(); + for (int i = 0; i < 3; i++) { + incContainers.add(ContainerResourceIncrease.newInstance(null, + Resource.newInstance(1024, i), null)); + } + for (int i = 0; i < 5; i++) { + decContainers.add(ContainerResourceDecrease.newInstance(null, + Resource.newInstance(1024, i))); + } + + AllocateResponse r = + AllocateResponse.newInstance(3, new ArrayList(), + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), + incContainers, decContainers); + + // serde + AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); + r = new AllocateResponsePBImpl(p); + + // check value + Assert + .assertEquals(incContainers.size(), r.getIncreasedContainers().size()); + Assert + .assertEquals(decContainers.size(), r.getDecreasedContainers().size()); + + for (int i = 0; i < incContainers.size(); i++) { + Assert.assertEquals(i, r.getIncreasedContainers().get(i).getCapability() + .getVirtualCores()); + } + + for (int i = 0; i < decContainers.size(); i++) { + Assert.assertEquals(i, r.getDecreasedContainers().get(i).getCapability() + .getVirtualCores()); + } + } + + @Test + public void testAllocateResponseWithoutIncDecContainers() { + AllocateResponse r = + AllocateResponse.newInstance(3, new ArrayList(), + new ArrayList(), new ArrayList(), null, + AMCommand.AM_RESYNC, 3, null, new ArrayList(), null, null); + + // serde + AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto(); + r = new AllocateResponsePBImpl(p); + + // check value + Assert.assertEquals(0, r.getIncreasedContainers().size()); + Assert.assertEquals(0, r.getDecreasedContainers().size()); + } +}