diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index dc10f957380..ccbb035be06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -1405,7 +1405,8 @@ private void mergeRegistrationResponses(AllocateResponse homeResponse, } } - private void mergeAllocateResponse(AllocateResponse homeResponse, + @VisibleForTesting + protected void mergeAllocateResponse(AllocateResponse homeResponse, AllocateResponse otherResponse, SubClusterId otherRMAddress) { if (otherResponse.getAMRMToken() != null) { @@ -1467,7 +1468,7 @@ private void mergeAllocateResponse(AllocateResponse homeResponse, if (par1 != null && par2 != null) { par1.getResourceRequest().addAll(par2.getResourceRequest()); - par2.getContainers().addAll(par2.getContainers()); + par1.getContainers().addAll(par2.getContainers()); } StrictPreemptionContract spar1 = homePreempMessage.getStrictContract(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index 48b7bf57970..ecaeaae7bf9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -22,8 +22,10 @@ import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -47,6 +49,8 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.PreemptionContainer; +import org.apache.hadoop.yarn.api.records.PreemptionContract; import org.apache.hadoop.yarn.api.records.PreemptionMessage; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -893,4 +897,75 @@ public Object run() throws Exception { } }); } + + @Test + public void testMergeAllocateResponse() { + ContainerId cid = ContainerId.newContainerId(attemptId, 0); + ContainerStatus cStatus = Records.newRecord(ContainerStatus.class); + cStatus.setContainerId(cid); + Container container = + Container.newInstance(cid, null, null, null, null, null); + + + AllocateResponse homeResponse = Records.newRecord(AllocateResponse.class); + homeResponse.setAllocatedContainers(Collections.singletonList(container)); + homeResponse.setCompletedContainersStatuses( + Collections.singletonList(cStatus)); + homeResponse.setUpdatedNodes( + Collections.singletonList(Records.newRecord(NodeReport.class))); + homeResponse.setNMTokens( + Collections.singletonList(Records.newRecord(NMToken.class))); + homeResponse.setUpdatedContainers( + Collections.singletonList( + Records.newRecord(UpdatedContainer.class))); + homeResponse.setUpdateErrors(Collections + .singletonList(Records.newRecord(UpdateContainerError.class))); + homeResponse.setAvailableResources(Records.newRecord(Resource.class)); + homeResponse.setPreemptionMessage(createDummyPreemptionMessage( + ContainerId.newContainerId(attemptId, 0))); + + AllocateResponse response = Records.newRecord(AllocateResponse.class); + response.setAllocatedContainers(Collections.singletonList(container)); + response.setCompletedContainersStatuses(Collections.singletonList(cStatus)); + response.setUpdatedNodes( + Collections.singletonList(Records.newRecord(NodeReport.class))); + response.setNMTokens( + Collections.singletonList(Records.newRecord(NMToken.class))); + response.setUpdatedContainers( + Collections.singletonList( + Records.newRecord(UpdatedContainer.class))); + response.setUpdateErrors(Collections + .singletonList(Records.newRecord(UpdateContainerError.class))); + response.setAvailableResources(Records.newRecord(Resource.class)); + response.setPreemptionMessage(createDummyPreemptionMessage( + ContainerId.newContainerId(attemptId, 1))); + + interceptor.mergeAllocateResponse(homeResponse, + response, SubClusterId.newInstance("SC-1")); + + Assert.assertEquals(2, + homeResponse.getPreemptionMessage().getContract() + .getContainers().size()); + Assert.assertEquals(2, + homeResponse.getAllocatedContainers().size()); + Assert.assertEquals(2, + homeResponse.getUpdatedNodes().size()); + Assert.assertEquals(2, + homeResponse.getCompletedContainersStatuses().size()); + } + + private PreemptionMessage createDummyPreemptionMessage( + ContainerId containerId) { + PreemptionMessage preemptionMessage = Records.newRecord( + PreemptionMessage.class); + PreemptionContainer container = Records.newRecord( + PreemptionContainer.class); + container.setId(containerId); + Set preemptionContainers = new HashSet<>(); + preemptionContainers.add(container); + PreemptionContract contract = Records.newRecord(PreemptionContract.class); + contract.setContainers(preemptionContainers); + preemptionMessage.setContract(contract); + return preemptionMessage; + } }