YARN-5483. Optimize RMAppAttempt#pullJustFinishedContainers. Contributed by sandflee

(cherry picked from commit 3d401206cf)

Conflicts:

	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
This commit is contained in:
Jason Lowe 2016-08-10 18:09:11 +00:00
parent 3b4d6ece1a
commit e1845faeda
1 changed files with 19 additions and 19 deletions

View File

@ -28,6 +28,7 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -141,14 +142,12 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private SecretKey clientTokenMasterKey = null; private SecretKey clientTokenMasterKey = null;
private ConcurrentMap<NodeId, List<ContainerStatus>> private ConcurrentMap<NodeId, List<ContainerStatus>>
justFinishedContainers = justFinishedContainers = new ConcurrentHashMap<>();
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
// Tracks the previous finished containers that are waiting to be // Tracks the previous finished containers that are waiting to be
// verified as received by the AM. If the AM sends the next allocate // verified as received by the AM. If the AM sends the next allocate
// request it implicitly acks this list. // request it implicitly acks this list.
private ConcurrentMap<NodeId, List<ContainerStatus>> private ConcurrentMap<NodeId, List<ContainerStatus>>
finishedContainersSentToAM = finishedContainersSentToAM = new ConcurrentHashMap<>();
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
private volatile Container masterContainer; private volatile Container masterContainer;
private float progress = 0; private float progress = 0;
@ -756,7 +755,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
public List<ContainerStatus> getJustFinishedContainers() { public List<ContainerStatus> getJustFinishedContainers() {
this.readLock.lock(); this.readLock.lock();
try { try {
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>(); List<ContainerStatus> returnList = new ArrayList<>();
for (Collection<ContainerStatus> containerStatusList : for (Collection<ContainerStatus> containerStatusList :
justFinishedContainers.values()) { justFinishedContainers.values()) {
returnList.addAll(containerStatusList); returnList.addAll(containerStatusList);
@ -795,7 +794,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
this.writeLock.lock(); this.writeLock.lock();
try { try {
List<ContainerStatus> returnList = new ArrayList<ContainerStatus>(); List<ContainerStatus> returnList = new ArrayList<>();
// A new allocate means the AM received the previously sent // A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now // finishedContainers. We can ack this to NM now
@ -803,15 +802,17 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// Mark every containerStatus as being sent to AM though we may return // Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt // only the ones that belong to the current attempt
boolean keepContainersAcressAttempts = this.submissionContext boolean keepContainersAcrossAppAttempts = this.submissionContext
.getKeepContainersAcrossApplicationAttempts(); .getKeepContainersAcrossApplicationAttempts();
for (NodeId nodeId:justFinishedContainers.keySet()) { for (Map.Entry<NodeId, List<ContainerStatus>> entry :
justFinishedContainers.entrySet()) {
NodeId nodeId = entry.getKey();
List<ContainerStatus> finishedContainers = entry.getValue();
if (finishedContainers.isEmpty()) {
continue;
}
// Clear and get current values if (keepContainersAcrossAppAttempts) {
List<ContainerStatus> finishedContainers = justFinishedContainers.put
(nodeId, new ArrayList<ContainerStatus>());
if (keepContainersAcressAttempts) {
returnList.addAll(finishedContainers); returnList.addAll(finishedContainers);
} else { } else {
// Filter out containers from previous attempt // Filter out containers from previous attempt
@ -823,12 +824,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
} }
if (!finishedContainers.isEmpty()) { finishedContainersSentToAM.putIfAbsent(nodeId,
finishedContainersSentToAM.putIfAbsent(nodeId, new ArrayList<ContainerStatus>());
new ArrayList<ContainerStatus>()); finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
finishedContainersSentToAM.get(nodeId).addAll(finishedContainers);
}
} }
justFinishedContainers.clear();
return returnList; return returnList;
} finally { } finally {
@ -1836,7 +1836,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
finishedContainersSentToAM.put(nodeId, finishedContainersSentToAM.put(nodeId,
new ArrayList<ContainerStatus>()); new ArrayList<ContainerStatus>());
List<ContainerId> containerIdList = List<ContainerId> containerIdList =
new ArrayList<ContainerId>(currentSentContainers.size()); new ArrayList<>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) { for (ContainerStatus containerStatus : currentSentContainers) {
containerIdList.add(containerStatus.getContainerId()); containerIdList.add(containerStatus.getContainerId());
} }