YARN-6962. Add support for updateContainers when allocating using FederationInterceptor. (Botong Huang via Subru).

(cherry picked from commit ca669f9f8b)
This commit is contained in:
Subru Krishnan 2017-09-28 13:04:03 -07:00
parent d6da014f67
commit ffcf5ba1ce
2 changed files with 111 additions and 29 deletions

View File

@ -540,30 +540,33 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistAdditions()) {
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
subClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
.add(resourceName);
if (request.getResourceBlacklistRequest() != null) {
if (!isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistAdditions())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistAdditions()) {
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest =
findOrCreateAllocateRequestForSubCluster(subClusterId, request,
requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistAdditions()
.add(resourceName);
}
}
}
}
if (request.getResourceBlacklistRequest() != null && !isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistRemovals()) {
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest = findOrCreateAllocateRequestForSubCluster(
subClusterId, request, requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
.add(resourceName);
if (!isNullOrEmpty(
request.getResourceBlacklistRequest().getBlacklistRemovals())) {
for (String resourceName : request.getResourceBlacklistRequest()
.getBlacklistRemovals()) {
SubClusterId subClusterId = getSubClusterForNode(resourceName);
if (subClusterId != null) {
AllocateRequest newRequest =
findOrCreateAllocateRequestForSubCluster(subClusterId, request,
requestMap);
newRequest.getResourceBlacklistRequest().getBlacklistRemovals()
.add(resourceName);
}
}
}
}
@ -896,13 +899,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
}
}
if (!isNullOrEmpty(otherResponse.getNMTokens())) {
if (!isNullOrEmpty(homeResponse.getNMTokens())) {
homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
} else {
homeResponse.setNMTokens(otherResponse.getNMTokens());
}
}
homeResponse.setNumClusterNodes(
homeResponse.getNumClusterNodes() + otherResponse.getNumClusterNodes());
PreemptionMessage homePreempMessage = homeResponse.getPreemptionMessage();
PreemptionMessage otherPreempMessage = otherResponse.getPreemptionMessage();
@ -935,6 +933,31 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
spar1.getContainers().addAll(spar2.getContainers());
}
}
if (!isNullOrEmpty(otherResponse.getNMTokens())) {
if (!isNullOrEmpty(homeResponse.getNMTokens())) {
homeResponse.getNMTokens().addAll(otherResponse.getNMTokens());
} else {
homeResponse.setNMTokens(otherResponse.getNMTokens());
}
}
if (!isNullOrEmpty(otherResponse.getUpdatedContainers())) {
if (!isNullOrEmpty(homeResponse.getUpdatedContainers())) {
homeResponse.getUpdatedContainers()
.addAll(otherResponse.getUpdatedContainers());
} else {
homeResponse.setUpdatedContainers(otherResponse.getUpdatedContainers());
}
}
if (!isNullOrEmpty(otherResponse.getUpdateErrors())) {
if (!isNullOrEmpty(homeResponse.getUpdateErrors())) {
homeResponse.getUpdateErrors().addAll(otherResponse.getUpdateErrors());
} else {
homeResponse.setUpdateErrors(otherResponse.getUpdateErrors());
}
}
}
/**
@ -1052,6 +1075,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
return this.uamPool.getAllUAMIds().size();
}
@VisibleForTesting
public Map<SubClusterId, List<AllocateResponse>> getAsyncResponseSink() {
return this.asyncResponseSink;
}
/**
* Private structure for encapsulating SubClusterId and
* RegisterApplicationMasterResponse instances.

View File

@ -20,7 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
@ -36,8 +38,15 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -493,4 +502,49 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
} catch (YarnException e) {
}
}
@Test
public void testAllocateResponse() throws Exception {
interceptor.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 0, null));
AllocateRequest allocateRequest = Records.newRecord(AllocateRequest.class);
Map<SubClusterId, List<AllocateResponse>> asyncResponseSink =
interceptor.getAsyncResponseSink();
ContainerId cid = ContainerId.newContainerId(attemptId, 0);
ContainerStatus cStatus = Records.newRecord(ContainerStatus.class);
cStatus.setContainerId(cid);
Container container =
Container.newInstance(cid, null, null, null, null, null);
AllocateResponse response = Records.newRecord(AllocateResponse.class);
response.setAllocatedContainers(Collections.singletonList(container));
response.setCompletedContainersStatuses(Collections.singletonList(cStatus));
response.setUpdatedNodes(
Collections.singletonList(Records.newRecord(NodeReport.class)));
response.setNMTokens(
Collections.singletonList(Records.newRecord(NMToken.class)));
response.setUpdatedContainers(
Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
response.setUpdateErrors(Collections
.singletonList(Records.newRecord(UpdateContainerError.class)));
response.setAvailableResources(Records.newRecord(Resource.class));
response.setPreemptionMessage(Records.newRecord(PreemptionMessage.class));
List<AllocateResponse> list = new ArrayList<>();
list.add(response);
asyncResponseSink.put(SubClusterId.newInstance("SC-1"), list);
response = interceptor.allocate(allocateRequest);
Assert.assertEquals(1, response.getAllocatedContainers().size());
Assert.assertNotNull(response.getAvailableResources());
Assert.assertEquals(1, response.getCompletedContainersStatuses().size());
Assert.assertEquals(1, response.getUpdatedNodes().size());
Assert.assertNotNull(response.getPreemptionMessage());
Assert.assertEquals(1, response.getNMTokens().size());
Assert.assertEquals(1, response.getUpdatedContainers().size());
Assert.assertEquals(1, response.getUpdateErrors().size());
}
}