YARN-7051. Avoid concurrent modification exception in FifoIntraQueuePreemptionPlugin. Contributed by Eric Payne.

This commit is contained in:
Sunil G 2017-08-28 12:22:56 +05:30
parent ae7abad3d9
commit 02599bda04
1 changed files with 17 additions and 9 deletions

View File

@ -397,10 +397,16 @@ private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
.getResourceUsage();
// perUserAMUsed was populated with running apps, now we are looping
// through both running and pending apps.
Resource userSpecificAmUsed = perUserAMUsed.get(userName);
amUsed = (userSpecificAmUsed == null)
? Resources.none() : userSpecificAmUsed;
TempUserPerPartition tmpUser = new TempUserPerPartition(
tq.leafQueue.getUser(userName), tq.queueName,
Resources.clone(userResourceUsage.getUsed(partition)),
Resources.clone(perUserAMUsed.get(userName)),
Resources.clone(userSpecificAmUsed),
Resources.clone(userResourceUsage.getReserved(partition)),
Resources.none());
@ -547,15 +553,17 @@ private Resource calculateUsedAMResourcesPerQueue(String partition,
Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
Resource amUsed = Resources.createResource(0, 0);
for (FiCaSchedulerApp app : runningApps) {
Resource userAMResource = perUserAMUsed.get(app.getUser());
if (null == userAMResource) {
userAMResource = Resources.createResource(0, 0);
perUserAMUsed.put(app.getUser(), userAMResource);
}
synchronized (leafQueue) {
for (FiCaSchedulerApp app : runningApps) {
Resource userAMResource = perUserAMUsed.get(app.getUser());
if (null == userAMResource) {
userAMResource = Resources.createResource(0, 0);
perUserAMUsed.put(app.getUser(), userAMResource);
}
Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition));
Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition));
}
}
return amUsed;