diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 1491ed9a175..1704553d468 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -160,6 +160,9 @@ Release 2.8.0 - UNRELEASED YARN-1643. Make ContainersMonitor support changing monitoring size of an allocated container. (Meng Ding and Wangda Tan) + YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to + support container resizing. (Meng Ding via jianhe) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java index 6cdf87fc931..338198bce61 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java @@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{ failoverThread = createAndStartFailoverThread(); NodeStatus status = NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, - null, null, null, null); + null, null, null, null, null); NodeHeartbeatRequest request2 = NodeHeartbeatRequest.newInstance(status, null, null,null); resourceTracker.nodeHeartbeat(request2); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index 1498a0c16d1..38fbc820fbf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -70,4 +71,7 @@ public interface NodeHeartbeatResponse { boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + + List getContainersToDecrease(); + void addAllContainersToDecrease(List containersToDecrease); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index e27d8ca007b..12c52300d02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -27,12 +27,15 @@ import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; @@ -58,7 +61,9 @@ public class NodeHeartbeatResponsePBImpl extends private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + + private List containersToDecrease = null; + public NodeHeartbeatResponsePBImpl() { builder = NodeHeartbeatResponseProto.newBuilder(); } @@ -96,6 +101,9 @@ public class NodeHeartbeatResponsePBImpl extends if (this.systemCredentials != null) { addSystemCredentialsToProto(); } + if (this.containersToDecrease != null) { + addContainersToDecreaseToProto(); + } } private void addSystemCredentialsToProto() { @@ -408,6 +416,64 @@ public class NodeHeartbeatResponsePBImpl extends builder.addAllApplicationsToCleanup(iterable); } + private void initContainersToDecrease() { + if (this.containersToDecrease != null) { + return; + } + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getContainersToDecreaseList(); + this.containersToDecrease = new ArrayList<>(); + + for (ContainerProto c : list) { + this.containersToDecrease.add(convertFromProtoFormat(c)); + } + } + + @Override + public List getContainersToDecrease() { + initContainersToDecrease(); + return this.containersToDecrease; + } + + @Override + public void addAllContainersToDecrease( + final List containersToDecrease) { + if (containersToDecrease == null) { + return; + } + initContainersToDecrease(); + this.containersToDecrease.addAll(containersToDecrease); + } + + private void addContainersToDecreaseToProto() { + maybeInitBuilder(); + builder.clearContainersToDecrease(); + if (this.containersToDecrease == null) { + return; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = containersToDecrease.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllContainersToDecrease(iterable); + } @Override public Map getSystemCredentialsForApps() { @@ -484,6 +550,14 @@ public class NodeHeartbeatResponsePBImpl extends return ((MasterKeyPBImpl) t).getProto(); } + private ContainerPBImpl convertFromProtoFormat(ContainerProto p) { + return new ContainerPBImpl(p); + } + + private ContainerProto convertToProtoFormat(Container t) { + return ((ContainerPBImpl) t).getProto(); + } + @Override public boolean getAreNodeLabelsAcceptedByRM() { NodeHeartbeatResponseProtoOrBuilder p = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 7b8262f26b0..2d62db59320 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.util.Records; @@ -48,6 +49,7 @@ public abstract class NodeStatus { * @param nodeHealthStatus Health status of the node. * @param containersUtilization Utilization of the containers in this node. * @param nodeUtilization Utilization of the node. + * @param increasedContainers Containers whose resource has been increased. * @return New {@code NodeStatus} with the provided information. */ public static NodeStatus newInstance(NodeId nodeId, int responseId, @@ -55,7 +57,8 @@ public abstract class NodeStatus { List keepAliveApplications, NodeHealthStatus nodeHealthStatus, ResourceUtilization containersUtilization, - ResourceUtilization nodeUtilization) { + ResourceUtilization nodeUtilization, + List increasedContainers) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); @@ -64,6 +67,7 @@ public abstract class NodeStatus { nodeStatus.setNodeHealthStatus(nodeHealthStatus); nodeStatus.setContainersUtilization(containersUtilization); nodeStatus.setNodeUtilization(nodeUtilization); + nodeStatus.setIncreasedContainers(increasedContainers); return nodeStatus; } @@ -108,4 +112,13 @@ public abstract class NodeStatus { @Unstable public abstract void setNodeUtilization( ResourceUtilization nodeUtilization); + + @Public + @Unstable + public abstract List getIncreasedContainers(); + + @Private + @Unstable + public abstract void setIncreasedContainers( + List increasedContainers); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 7d4e83f6794..e34451da6f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -24,13 +24,16 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; @@ -49,7 +52,8 @@ public class NodeStatusPBImpl extends NodeStatus { private List containers = null; private NodeHealthStatus nodeHealthStatus = null; private List keepAliveApplications = null; - + private List increasedContainers = null; + public NodeStatusPBImpl() { builder = NodeStatusProto.newBuilder(); } @@ -79,6 +83,9 @@ public class NodeStatusPBImpl extends NodeStatus { if (this.keepAliveApplications != null) { addKeepAliveApplicationsToProto(); } + if (this.increasedContainers != null) { + addIncreasedContainersToProto(); + } } private synchronized void mergeLocalToProto() { @@ -165,6 +172,37 @@ public class NodeStatusPBImpl extends NodeStatus { builder.addAllKeepAliveApplications(iterable); } + private synchronized void addIncreasedContainersToProto() { + maybeInitBuilder(); + builder.clearIncreasedContainers(); + if (increasedContainers == null) { + return; + } + Iterable iterable = new + Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + private Iterator iter = + increasedContainers.iterator(); + @Override + public boolean hasNext() { + return iter.hasNext(); + } + @Override + public ContainerProto next() { + return convertToProtoFormat(iter.next()); + } + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + }; + builder.addAllIncreasedContainers(iterable); + } + @Override public int hashCode() { return getProto().hashCode(); @@ -336,6 +374,31 @@ public class NodeStatusPBImpl extends NodeStatus { .setNodeUtilization(convertToProtoFormat(nodeUtilization)); } + @Override + public synchronized List getIncreasedContainers() { + if (increasedContainers != null) { + return increasedContainers; + } + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + List list = p.getIncreasedContainersList(); + this.increasedContainers = new ArrayList<>(); + for (ContainerProto c : list) { + this.increasedContainers.add(convertFromProtoFormat(c)); + } + return this.increasedContainers; + } + + @Override + public synchronized void setIncreasedContainers( + List increasedContainers) { + maybeInitBuilder(); + if (increasedContainers == null) { + builder.clearIncreasedContainers(); + return; + } + this.increasedContainers = increasedContainers; + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } @@ -377,4 +440,14 @@ public class NodeStatusPBImpl extends NodeStatus { ResourceUtilizationProto p) { return new ResourceUtilizationPBImpl(p); } + + private ContainerPBImpl convertFromProtoFormat( + ContainerProto c) { + return new ContainerPBImpl(c); + } + + private ContainerProto convertToProtoFormat( + Container c) { + return ((ContainerPBImpl)c).getProto(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 901051ff167..b161f5bc668 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -38,6 +38,7 @@ message NodeStatusProto { repeated ApplicationIdProto keep_alive_applications = 5; optional ResourceUtilizationProto containers_utilization = 6; optional ResourceUtilizationProto node_utilization = 7; + repeated ContainerProto increased_containers = 8; } message MasterKeyProto { @@ -60,4 +61,4 @@ message ResourceUtilizationProto { optional int32 pmem = 1; optional int32 vmem = 2; optional float cpu = 3; -} \ No newline at end of file +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index c122b2adef2..2db8919d2dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto { repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; + repeated ContainerProto containers_to_decrease = 12; } message SystemCredentialsForAppsProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java index d9eeb9db68b..c9427ddabc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java @@ -29,6 +29,7 @@ import java.util.HashSet; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; @@ -168,6 +169,20 @@ public class TestYarnServerApiClasses { assertTrue(copy.getAreNodeLabelsAcceptedByRM()); } + @Test + public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() { + NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl(); + original.addAllContainersToDecrease( + Arrays.asList(getDecreasedContainer(1, 2, 2048, 2), + getDecreasedContainer(2, 3, 1024, 1))); + NodeHeartbeatResponsePBImpl copy = + new NodeHeartbeatResponsePBImpl(original.getProto()); + assertEquals(1, copy.getContainersToDecrease().get(0) + .getId().getContainerId()); + assertEquals(1024, copy.getContainersToDecrease().get(1) + .getResource().getMemory()); + } + /** * Test RegisterNodeManagerRequestPBImpl. */ @@ -244,6 +259,9 @@ public class TestYarnServerApiClasses { original.setNodeHealthStatus(getNodeHealthStatus()); original.setNodeId(getNodeId()); original.setResponseId(1); + original.setIncreasedContainers( + Arrays.asList(getIncreasedContainer(1, 2, 2048, 2), + getIncreasedContainer(2, 3, 4096, 3))); NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto()); assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId() @@ -252,7 +270,10 @@ public class TestYarnServerApiClasses { assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime()); assertEquals(9090, copy.getNodeId().getPort()); assertEquals(1, copy.getResponseId()); - + assertEquals(1, copy.getIncreasedContainers().get(0) + .getId().getContainerId()); + assertEquals(4096, copy.getIncreasedContainers().get(1) + .getResource().getMemory()); } @Test @@ -347,6 +368,22 @@ public class TestYarnServerApiClasses { return new ApplicationIdPBImpl(appId.getProto()); } + private Container getDecreasedContainer(int containerID, + int appAttemptId, int memory, int vCores) { + ContainerId containerId = getContainerId(containerID, appAttemptId); + Resource capability = Resource.newInstance(memory, vCores); + return Container.newInstance( + containerId, null, null, capability, null, null); + } + + private Container getIncreasedContainer(int containerID, + int appAttemptId, int memory, int vCores) { + ContainerId containerId = getContainerId(containerID, appAttemptId); + Resource capability = Resource.newInstance(memory, vCores); + return Container.newInstance( + containerId, null, null, capability, null, null); + } + private NodeStatus getNodeStatus() { NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class); status.setContainersStatuses(new ArrayList()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java index 52d937b2377..9c2d1fb2328 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java @@ -62,6 +62,9 @@ public interface Context { ConcurrentMap getContainers(); + ConcurrentMap + getIncreasedContainers(); + NMContainerTokenSecretManager getContainerTokenSecretManager(); NMTokenSecretManagerInNM getNMTokenSecretManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 3cf9f1aa35b..184f4891309 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -439,6 +439,10 @@ public class NodeManager extends CompositeService protected final ConcurrentMap containers = new ConcurrentSkipListMap(); + protected final ConcurrentMap increasedContainers = + new ConcurrentHashMap<>(); + private final NMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInNM nmTokenSecretManager; private ContainerManagementProtocol containerManager; @@ -492,6 +496,12 @@ public class NodeManager extends CompositeService return this.containers; } + @Override + public ConcurrentMap + getIncreasedContainers() { + return this.increasedContainers; + } + @Override public NMContainerTokenSecretManager getContainerTokenSecretManager() { return this.containerTokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index aa51e5c6e85..f8ce90f42b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -310,18 +310,28 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - List containerReports = getNMContainerStatuses(); + RegisterNodeManagerResponse regNMResponse; Set nodeLabels = nodeLabelsHandler.getNodeLabelsForRegistration(); - RegisterNodeManagerRequest request = - RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, - nodeManagerVersionId, containerReports, getRunningApplications(), - nodeLabels); - if (containerReports != null) { - LOG.info("Registering with RM using containers :" + containerReports); + + // Synchronize NM-RM registration with + // ContainerManagerImpl#increaseContainersResource and + // ContainerManagerImpl#startContainers to avoid race condition + // during RM recovery + synchronized (this.context) { + List containerReports = getNMContainerStatuses(); + RegisterNodeManagerRequest request = + RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, + nodeManagerVersionId, containerReports, getRunningApplications(), + nodeLabels); + if (containerReports != null) { + LOG.info("Registering with RM using containers :" + containerReports); + } + regNMResponse = + resourceTracker.registerNodeManager(request); + // Make sure rmIdentifier is set before we release the lock + this.rmIdentifier = regNMResponse.getRMIdentifier(); } - RegisterNodeManagerResponse regNMResponse = - resourceTracker.registerNodeManager(request); - this.rmIdentifier = regNMResponse.getRMIdentifier(); + // if the Resource Manager instructs NM to shutdown. if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) { String message = @@ -418,10 +428,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements List containersStatuses = getContainerStatuses(); ResourceUtilization containersUtilization = getContainersUtilization(); ResourceUtilization nodeUtilization = getNodeUtilization(); + List increasedContainers + = getIncreasedContainers(); NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization); + containersUtilization, nodeUtilization, increasedContainers); return nodeStatus; } @@ -448,6 +460,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return nodeResourceMonitor.getUtilization(); } + /* Get the containers whose resource has been increased since last + * NM-RM heartbeat. + */ + private List + getIncreasedContainers() { + List + increasedContainers = new ArrayList<>( + this.context.getIncreasedContainers().values()); + for (org.apache.hadoop.yarn.api.records.Container + container : increasedContainers) { + this.context.getIncreasedContainers().remove(container.getId()); + } + return increasedContainers; + } + // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the // recentlyStoppedContainers collections. @@ -765,6 +792,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ((NMContext) context) .setSystemCrendentialsForApps(parseCredentials(systemCredentials)); } + + List + containersToDecrease = response.getContainersToDecrease(); + if (!containersToDecrease.isEmpty()) { + dispatcher.getEventHandler().handle( + new CMgrDecreaseContainersResourceEvent(containersToDecrease) + ); + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java index 4f2ccbea356..868d8d3489f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java @@ -563,8 +563,7 @@ public class ContainerManagerImpl extends CompositeService implements List appIds = new ArrayList(applications.keySet()); - this.handle( - new CMgrCompletedAppsEvent(appIds, + this.handle(new CMgrCompletedAppsEvent(appIds, CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN)); LOG.info("Waiting for Applications to be Finished"); @@ -584,8 +583,8 @@ public class ContainerManagerImpl extends CompositeService implements if (applications.isEmpty()) { LOG.info("All applications in FINISHED state"); } else { - LOG.info("Done waiting for Applications to be Finished. Still alive: " + - applications.keySet()); + LOG.info("Done waiting for Applications to be Finished. Still alive: " + + applications.keySet()); } } @@ -759,13 +758,12 @@ public class ContainerManagerImpl extends CompositeService implements * Start a list of containers on this NodeManager. */ @Override - public StartContainersResponse - startContainers(StartContainersRequest requests) throws YarnException, - IOException { + public StartContainersResponse startContainers( + StartContainersRequest requests) throws YarnException, IOException { if (blockNewContainerRequests.get()) { throw new NMNotYetReadyException( - "Rejecting new containers as NodeManager has not" - + " yet connected with ResourceManager"); + "Rejecting new containers as NodeManager has not" + + " yet connected with ResourceManager"); } UserGroupInformation remoteUgi = getRemoteUgi(); NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi); @@ -773,42 +771,50 @@ public class ContainerManagerImpl extends CompositeService implements List succeededContainers = new ArrayList(); Map failedContainers = new HashMap(); - for (StartContainerRequest request : requests.getStartContainerRequests()) { - ContainerId containerId = null; - try { - if (request.getContainerToken() == null || - request.getContainerToken().getIdentifier() == null) { - throw new IOException(INVALID_CONTAINERTOKEN_MSG); - } - ContainerTokenIdentifier containerTokenIdentifier = - BuilderUtils.newContainerTokenIdentifier(request.getContainerToken()); - verifyAndGetContainerTokenIdentifier(request.getContainerToken(), - containerTokenIdentifier); - containerId = containerTokenIdentifier.getContainerID(); + // Synchronize with NodeStatusUpdaterImpl#registerWithRM + // to avoid race condition during NM-RM resync (due to RM restart) while a + // container is being started, in particular when the container has not yet + // been added to the containers map in NMContext. + synchronized (this.context) { + for (StartContainerRequest request : requests + .getStartContainerRequests()) { + ContainerId containerId = null; + try { + if (request.getContainerToken() == null + || request.getContainerToken().getIdentifier() == null) { + throw new IOException(INVALID_CONTAINERTOKEN_MSG); + } - // Initialize the AMRMProxy service instance only if the container is of - // type AM and if the AMRMProxy service is enabled - if (isARMRMProxyEnabled() - && containerTokenIdentifier.getContainerType().equals( - ContainerType.APPLICATION_MASTER)) { - this.amrmProxyService.processApplicationStartRequest(request); - } + ContainerTokenIdentifier containerTokenIdentifier = BuilderUtils + .newContainerTokenIdentifier(request.getContainerToken()); + verifyAndGetContainerTokenIdentifier(request.getContainerToken(), + containerTokenIdentifier); + containerId = containerTokenIdentifier.getContainerID(); - startContainerInternal(nmTokenIdentifier, - containerTokenIdentifier, request); - succeededContainers.add(containerId); - } catch (YarnException e) { - failedContainers.put(containerId, SerializedException.newInstance(e)); - } catch (InvalidToken ie) { - failedContainers.put(containerId, SerializedException.newInstance(ie)); - throw ie; - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); + // Initialize the AMRMProxy service instance only if the container is of + // type AM and if the AMRMProxy service is enabled + if (isARMRMProxyEnabled() && containerTokenIdentifier + .getContainerType().equals(ContainerType.APPLICATION_MASTER)) { + this.amrmProxyService.processApplicationStartRequest(request); + } + + startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, + request); + succeededContainers.add(containerId); + } catch (YarnException e) { + failedContainers.put(containerId, SerializedException.newInstance(e)); + } catch (InvalidToken ie) { + failedContainers + .put(containerId, SerializedException.newInstance(ie)); + throw ie; + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); + } } + return StartContainersResponse + .newInstance(getAuxServiceMetaData(), succeededContainers, + failedContainers); } - - return StartContainersResponse.newInstance(getAuxServiceMetaData(), - succeededContainers, failedContainers); } private ContainerManagerApplicationProto buildAppProto(ApplicationId appId, @@ -959,7 +965,7 @@ public class ContainerManagerImpl extends CompositeService implements InvalidToken { byte[] password = context.getContainerTokenSecretManager().retrievePassword( - containerTokenIdentifier); + containerTokenIdentifier); byte[] tokenPass = token.getPassword().array(); if (password == null || tokenPass == null || !Arrays.equals(password, tokenPass)) { @@ -989,32 +995,39 @@ public class ContainerManagerImpl extends CompositeService implements = new ArrayList(); Map failedContainers = new HashMap(); - // Process container resource increase requests - for (org.apache.hadoop.yarn.api.records.Token token : - requests.getContainersToIncrease()) { - ContainerId containerId = null; - try { - if (token.getIdentifier() == null) { - throw new IOException(INVALID_CONTAINERTOKEN_MSG); + // Synchronize with NodeStatusUpdaterImpl#registerWithRM + // to avoid race condition during NM-RM resync (due to RM restart) while a + // container resource is being increased in NM, in particular when the + // increased container has not yet been added to the increasedContainers + // map in NMContext. + synchronized (this.context) { + // Process container resource increase requests + for (org.apache.hadoop.yarn.api.records.Token token : + requests.getContainersToIncrease()) { + ContainerId containerId = null; + try { + if (token.getIdentifier() == null) { + throw new IOException(INVALID_CONTAINERTOKEN_MSG); + } + ContainerTokenIdentifier containerTokenIdentifier = + BuilderUtils.newContainerTokenIdentifier(token); + verifyAndGetContainerTokenIdentifier(token, + containerTokenIdentifier); + authorizeStartAndResourceIncreaseRequest( + nmTokenIdentifier, containerTokenIdentifier, false); + containerId = containerTokenIdentifier.getContainerID(); + // Reuse the startContainer logic to update NMToken, + // as container resource increase request will have come with + // an updated NMToken. + updateNMTokenIdentifier(nmTokenIdentifier); + Resource resource = containerTokenIdentifier.getResource(); + changeContainerResourceInternal(containerId, resource, true); + successfullyIncreasedContainers.add(containerId); + } catch (YarnException | InvalidToken e) { + failedContainers.put(containerId, SerializedException.newInstance(e)); + } catch (IOException e) { + throw RPCUtil.getRemoteException(e); } - ContainerTokenIdentifier containerTokenIdentifier = - BuilderUtils.newContainerTokenIdentifier(token); - verifyAndGetContainerTokenIdentifier(token, - containerTokenIdentifier); - authorizeStartAndResourceIncreaseRequest( - nmTokenIdentifier, containerTokenIdentifier, false); - containerId = containerTokenIdentifier.getContainerID(); - // Reuse the startContainer logic to update NMToken, - // as container resource increase request will have come with - // an updated NMToken. - updateNMTokenIdentifier(nmTokenIdentifier); - Resource resource = containerTokenIdentifier.getResource(); - changeContainerResourceInternal(containerId, resource, true); - successfullyIncreasedContainers.add(containerId); - } catch (YarnException | InvalidToken e) { - failedContainers.put(containerId, SerializedException.newInstance(e)); - } catch (IOException e) { - throw RPCUtil.getRemoteException(e); } } return IncreaseContainersResourceResponse.newInstance( @@ -1075,6 +1088,16 @@ public class ContainerManagerImpl extends CompositeService implements + " is not smaller than the current resource " + currentResource.toString()); } + if (increase) { + org.apache.hadoop.yarn.api.records.Container increasedContainer = + org.apache.hadoop.yarn.api.records.Container.newInstance( + containerId, null, null, targetResource, null, null); + if (context.getIncreasedContainers().putIfAbsent(containerId, + increasedContainer) != null){ + throw RPCUtil.getRemoteException("Container " + containerId.toString() + + " resource is being increased."); + } + } this.readLock.lock(); try { if (!serviceStopped) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index c22d4753154..4250ac3a806 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -18,21 +18,35 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.util.Shell; +import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest; +import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -41,8 +55,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -50,6 +69,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; +import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; @@ -57,12 +78,15 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -87,7 +111,10 @@ public class TestNodeManagerResync { private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false); private final NodeManagerEvent resyncEvent = new NodeManagerEvent(NodeManagerEventType.RESYNC); + private final long DUMMY_RM_IDENTIFIER = 1234; + protected static Log LOG = LogFactory + .getLog(TestNodeManagerResync.class); @Before public void setup() throws UnsupportedFileSystemException { @@ -209,6 +236,32 @@ public class TestNodeManagerResync { nm.stop(); } + @SuppressWarnings("unchecked") + @Test(timeout=60000) + public void testContainerResourceIncreaseIsSynchronizedWithRMResync() + throws IOException, InterruptedException, YarnException { + NodeManager nm = new TestNodeManager4(); + YarnConfiguration conf = createNMConfig(); + conf.setBoolean( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true); + nm.init(conf); + nm.start(); + // Start a container and make sure it is in RUNNING state + ((TestNodeManager4)nm).startContainer(); + // Simulate a container resource increase in a separate thread + ((TestNodeManager4)nm).increaseContainersResource(); + // Simulate RM restart by sending a RESYNC event + LOG.info("Sending out RESYNC event"); + nm.getNMDispatcher().getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + Assert.assertFalse(assertionFailedInThread.get()); + nm.stop(); + } // This is to test when NM gets the resync response from last heart beat, it // should be able to send the already-sent-via-last-heart-beat container @@ -588,6 +641,211 @@ public class TestNodeManagerResync { } }} + class TestNodeManager4 extends NodeManager { + + private Thread increaseContainerResourceThread = null; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl4(context, dispatcher, + healthChecker, metrics); + } + + @Override + protected ContainerManagerImpl createContainerManager(Context context, + ContainerExecutor exec, DeletionService del, + NodeStatusUpdater nodeStatusUpdater, + ApplicationACLsManager aclsManager, + LocalDirsHandlerService dirsHandler) { + return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater, + metrics, dirsHandler){ + @Override + public void + setBlockNewContainerRequests(boolean blockNewContainerRequests) { + // do nothing + } + + @Override + protected void authorizeGetAndStopContainerRequest( + ContainerId containerId, Container container, + boolean stopRequest, NMTokenIdentifier identifier) + throws YarnException { + // do nothing + } + @Override + protected void authorizeUser(UserGroupInformation remoteUgi, + NMTokenIdentifier nmTokenIdentifier) { + // do nothing + } + @Override + protected void authorizeStartAndResourceIncreaseRequest( + NMTokenIdentifier nmTokenIdentifier, + ContainerTokenIdentifier containerTokenIdentifier, + boolean startRequest) throws YarnException { + try { + // Sleep 2 seconds to simulate a pro-longed increase action. + // If during this time a RESYNC event is sent by RM, the + // resync action should block until the increase action is + // completed. + // See testContainerResourceIncreaseIsSynchronizedWithRMResync() + Thread.sleep(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + @Override + protected void updateNMTokenIdentifier( + NMTokenIdentifier nmTokenIdentifier) + throws SecretManager.InvalidToken { + // Do nothing + } + @Override + public Map getAuxServiceMetaData() { + return new HashMap<>(); + } + @Override + protected NMTokenIdentifier selectNMTokenIdentifier( + UserGroupInformation remoteUgi) { + return new NMTokenIdentifier(); + } + }; + } + + // Start a container in NM + public void startContainer() + throws IOException, InterruptedException, YarnException { + LOG.info("Start a container and wait until it is in RUNNING state"); + File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile"); + PrintWriter fileWriter = new PrintWriter(scriptFile); + if (Shell.WINDOWS) { + fileWriter.println("@ping -n 100 127.0.0.1 >nul"); + } else { + fileWriter.write("\numask 0"); + fileWriter.write("\nexec sleep 100"); + } + fileWriter.close(); + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + URL resource_alpha = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource rsrc_alpha = + recordFactory.newRecordInstance(LocalResource.class); + rsrc_alpha.setResource(resource_alpha); + rsrc_alpha.setSize(-1); + rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION); + rsrc_alpha.setType(LocalResourceType.FILE); + rsrc_alpha.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, rsrc_alpha); + containerLaunchContext.setLocalResources(localResources); + List commands = + Arrays.asList(Shell.getRunScriptCommand(scriptFile)); + containerLaunchContext.setCommands(commands); + Resource resource = Resource.newInstance(1024, 1); + StartContainerRequest scRequest = + StartContainerRequest.newInstance( + containerLaunchContext, + getContainerToken(resource)); + List list = new ArrayList(); + list.add(scRequest); + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + getContainerManager().startContainers(allRequests); + // Make sure the container reaches RUNNING state + ContainerId cId = TestContainerManager.createContainerId(0); + BaseContainerManagerTest.waitForNMContainerState( + getContainerManager(), cId, + org.apache.hadoop.yarn.server.nodemanager. + containermanager.container.ContainerState.RUNNING); + } + + // Increase container resource in a thread + public void increaseContainersResource() + throws InterruptedException { + LOG.info("Increase a container resource in a separate thread"); + increaseContainerResourceThread = new IncreaseContainersResourceThread(); + increaseContainerResourceThread.start(); + } + + class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void rebootNodeStatusUpdaterAndRegisterWithRM() { + try { + try { + // Check status before registerWithRM + List containerIds = new ArrayList<>(); + ContainerId cId = TestContainerManager.createContainerId(0); + containerIds.add(cId); + GetContainerStatusesRequest gcsRequest = + GetContainerStatusesRequest.newInstance(containerIds); + ContainerStatus containerStatus = getContainerManager() + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + assertEquals(Resource.newInstance(1024, 1), + containerStatus.getCapability()); + // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM(). + // This function should be synchronized with + // increaseContainersResource(). + super.rebootNodeStatusUpdaterAndRegisterWithRM(); + // Check status after registerWithRM + containerStatus = getContainerManager() + .getContainerStatuses(gcsRequest).getContainerStatuses().get(0); + assertEquals(Resource.newInstance(4096, 2), + containerStatus.getCapability()); + } catch (AssertionError ae) { + ae.printStackTrace(); + assertionFailedInThread.set(true); + } finally { + syncBarrier.await(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + class IncreaseContainersResourceThread extends Thread { + @Override + public void run() { + // Construct container resource increase request + List increaseTokens = new ArrayList(); + // Add increase request. + Resource targetResource = Resource.newInstance(4096, 2); + try { + increaseTokens.add(getContainerToken(targetResource)); + IncreaseContainersResourceRequest increaseRequest = + IncreaseContainersResourceRequest.newInstance(increaseTokens); + IncreaseContainersResourceResponse increaseResponse = + getContainerManager() + .increaseContainersResource(increaseRequest); + Assert.assertEquals( + 1, increaseResponse.getSuccessfullyIncreasedContainers() + .size()); + Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty()); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + private Token getContainerToken(Resource resource) throws IOException { + ContainerId cId = TestContainerManager.createContainerId(0); + return TestContainerManager.createContainerToken( + cId, DUMMY_RM_IDENTIFIER, + getNMContext().getNodeId(), user, resource, + getNMContext().getContainerTokenSecretManager(), null); + } + } + public static NMContainerStatus createNMContainerStatus(int id, ContainerState containerState) { ApplicationId applicationId = ApplicationId.newInstance(0, 1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 964379a411a..9bc23f6f43e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -619,6 +619,11 @@ public abstract class BaseAMRMProxyTest { return null; } + @Override + public ConcurrentMap getIncreasedContainers() { + return null; + } + @Override public NMContainerTokenSecretManager getContainerTokenSecretManager() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java index 7573a7a52bb..f482784fe90 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java @@ -93,8 +93,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease; -import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; @@ -292,8 +290,8 @@ public class MockResourceManagerFacade implements new ArrayList(), containerList, new ArrayList(), null, AMCommand.AM_RESYNC, 1, null, new ArrayList(), - new ArrayList(), - new ArrayList()); + new ArrayList(), + new ArrayList()); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java index 2ea9146b71b..3fb4112447a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java @@ -108,7 +108,7 @@ public class TestContainerManager extends BaseContainerManagerTest { super.setup(); } - private ContainerId createContainerId(int id) { + public static ContainerId createContainerId(int id) { ApplicationId appId = ApplicationId.newInstance(0, 0); ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);