YARN-6582. FSAppAttempt demand can be updated atomically in updateDemand(). (Karthik Kambatla via Yufei Gu)
This commit is contained in:
parent
3fd6a2da4e
commit
87590090c8
|
@ -1286,24 +1286,21 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateDemand() {
|
public void updateDemand() {
|
||||||
demand = Resources.createResource(0);
|
|
||||||
// Demand is current consumption plus outstanding requests
|
// Demand is current consumption plus outstanding requests
|
||||||
Resources.addTo(demand, getCurrentConsumption());
|
Resource tmpDemand = Resources.clone(getCurrentConsumption());
|
||||||
|
|
||||||
// Add up outstanding resource requests
|
// Add up outstanding resource requests
|
||||||
try {
|
|
||||||
writeLock.lock();
|
|
||||||
for (SchedulerRequestKey k : getSchedulerKeys()) {
|
for (SchedulerRequestKey k : getSchedulerKeys()) {
|
||||||
PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY);
|
PendingAsk pendingAsk = getPendingAsk(k, ResourceRequest.ANY);
|
||||||
if (pendingAsk.getCount() > 0) {
|
if (pendingAsk.getCount() > 0) {
|
||||||
Resources.multiplyAndAddTo(demand,
|
Resources.multiplyAndAddTo(tmpDemand,
|
||||||
pendingAsk.getPerAllocationResource(),
|
pendingAsk.getPerAllocationResource(),
|
||||||
pendingAsk.getCount());
|
pendingAsk.getCount());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
|
||||||
writeLock.unlock();
|
// Update demand
|
||||||
}
|
demand = tmpDemand;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue