YARN-6171. ConcurrentModificationException on FSAppAttempt.containersToPreempt. (Miklos Szegedi via kasha)

(cherry picked from commit a77f432449)
This commit is contained in:
Karthik Kambatla 2017-02-16 14:54:51 -08:00
parent 8fc67e5973
commit afc8124ff6
2 changed files with 34 additions and 30 deletions

View File

@ -82,8 +82,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
private Resource fairShare = Resources.createResource(0, 0); private Resource fairShare = Resources.createResource(0, 0);
// Preemption related variables // Preemption related variables
private final Object preemptionVariablesLock = new Object();
private final Resource preemptedResources = Resources.clone(Resources.none()); private final Resource preemptedResources = Resources.clone(Resources.none());
private final Set<RMContainer> containersToPreempt = new HashSet<>(); private final Set<RMContainer> containersToPreempt = new HashSet<>();
private Resource fairshareStarvation = Resources.none(); private Resource fairshareStarvation = Resources.none();
private long lastTimeAtFairShare; private long lastTimeAtFairShare;
private long nextStarvationCheck; private long nextStarvationCheck;
@ -551,29 +553,29 @@ void resetMinshareStarvation() {
} }
void trackContainerForPreemption(RMContainer container) { void trackContainerForPreemption(RMContainer container) {
if (containersToPreempt.add(container)) { synchronized (preemptionVariablesLock) {
synchronized (preemptedResources) { if (containersToPreempt.add(container)) {
Resources.addTo(preemptedResources, container.getAllocatedResource()); Resources.addTo(preemptedResources, container.getAllocatedResource());
} }
} }
} }
private void untrackContainerForPreemption(RMContainer container) { private void untrackContainerForPreemption(RMContainer container) {
if (containersToPreempt.remove(container)) { synchronized (preemptionVariablesLock) {
synchronized (preemptedResources) { if (containersToPreempt.remove(container)) {
Resources.subtractFrom(preemptedResources, Resources.subtractFrom(preemptedResources,
container.getAllocatedResource()); container.getAllocatedResource());
} }
} }
} }
Set<RMContainer> getPreemptionContainers() { Set<ContainerId> getPreemptionContainerIds() {
return containersToPreempt; synchronized (preemptionVariablesLock) {
} Set<ContainerId> preemptionContainerIds = new HashSet<>();
for (RMContainer container : containersToPreempt) {
private Resource getPreemptedResources() { preemptionContainerIds.add(container.getContainerId());
synchronized (preemptedResources) { }
return preemptedResources; return preemptionContainerIds;
} }
} }
@ -590,9 +592,11 @@ boolean canContainerBePreempted(RMContainer container) {
return false; return false;
} }
if (containersToPreempt.contains(container)) { synchronized (preemptionVariablesLock) {
// The container is already under consideration for preemption if (containersToPreempt.contains(container)) {
return false; // The container is already under consideration for preemption
return false;
}
} }
// Check if the app's allocation will be over its fairshare even // Check if the app's allocation will be over its fairshare even
@ -964,7 +968,8 @@ private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Assign container on " + node.getNodeName() LOG.trace("Assign container on " + node.getNodeName()
+ " node, assignType: OFF_SWITCH" + ", allowedLocality: " + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
+ allowedLocality + ", priority: " + schedulerKey.getPriority() + allowedLocality + ", priority: "
+ schedulerKey.getPriority()
+ ", app attempt id: " + this.attemptId); + ", app attempt id: " + this.attemptId);
} }
return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH, return assignContainer(node, offSwitchRequest, NodeType.OFF_SWITCH,
@ -1219,13 +1224,13 @@ public Resource getMaxShare() {
@Override @Override
public Resource getResourceUsage() { public Resource getResourceUsage() {
/* // Subtract copies the object, so that we have a snapshot,
* getResourcesToPreempt() returns zero, except when there are containers // in case usage changes, while the caller is using the value
* to preempt. Avoid creating an object in the common case. synchronized (preemptionVariablesLock) {
*/ return containersToPreempt.isEmpty()
return getPreemptedResources().equals(Resources.none()) ? getCurrentConsumption()
? getCurrentConsumption() : Resources.subtract(getCurrentConsumption(), preemptedResources);
: Resources.subtract(getCurrentConsumption(), getPreemptedResources()); }
} }
@Override @Override

View File

@ -32,6 +32,7 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -831,8 +832,9 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Release containers // Release containers
releaseContainers(release, application); releaseContainers(release, application);
ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
lock.lock();
try { try {
application.getWriteLock().lock();
if (!ask.isEmpty()) { if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
@ -847,24 +849,21 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
application.showRequests(); application.showRequests();
} }
} finally { } finally {
application.getWriteLock().unlock(); lock.unlock();
} }
Set<ContainerId> preemptionContainerIds =
application.getPreemptionContainerIds();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
"allocate: post-update" + " applicationAttemptId=" + appAttemptId "allocate: post-update" + " applicationAttemptId=" + appAttemptId
+ " #ask=" + ask.size() + " reservation= " + application + " #ask=" + ask.size() + " reservation= " + application
.getCurrentReservation()); .getCurrentReservation());
LOG.debug("Preempting " + application.getPreemptionContainers().size() LOG.debug("Preempting " + preemptionContainerIds.size()
+ " container(s)"); + " container(s)");
} }
Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
for (RMContainer container : application.getPreemptionContainers()) {
preemptionContainerIds.add(container.getContainerId());
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals); application.updateBlacklist(blacklistAdditions, blacklistRemovals);
List<Container> newlyAllocatedContainers = List<Container> newlyAllocatedContainers =