YARN-6864. FSPreemptionThread cleanup for readability. (Daniel Templeton via Yufei Gu)
(cherry picked from commit 9902be72cb
)
This commit is contained in:
parent
c3e8c7d56e
commit
e29d1c75ea
|
@ -68,11 +68,11 @@ class FSPreemptionThread extends Thread {
|
||||||
schedulerReadLock = scheduler.getSchedulerReadLock();
|
schedulerReadLock = scheduler.getSchedulerReadLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (!Thread.interrupted()) {
|
while (!Thread.interrupted()) {
|
||||||
FSAppAttempt starvedApp;
|
|
||||||
try {
|
try {
|
||||||
starvedApp = context.getStarvedApps().take();
|
FSAppAttempt starvedApp = context.getStarvedApps().take();
|
||||||
// Hold the scheduler readlock so this is not concurrent with the
|
// Hold the scheduler readlock so this is not concurrent with the
|
||||||
// update thread.
|
// update thread.
|
||||||
schedulerReadLock.lock();
|
schedulerReadLock.lock();
|
||||||
|
@ -84,7 +84,7 @@ class FSPreemptionThread extends Thread {
|
||||||
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
|
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.info("Preemption thread interrupted! Exiting.");
|
LOG.info("Preemption thread interrupted! Exiting.");
|
||||||
return;
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -114,16 +114,19 @@ class FSPreemptionThread extends Thread {
|
||||||
PreemptableContainers bestContainers = null;
|
PreemptableContainers bestContainers = null;
|
||||||
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
|
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
|
||||||
.getNodesByResourceName(rr.getResourceName());
|
.getNodesByResourceName(rr.getResourceName());
|
||||||
|
int maxAMContainers = Integer.MAX_VALUE;
|
||||||
|
|
||||||
for (FSSchedulerNode node : potentialNodes) {
|
for (FSSchedulerNode node : potentialNodes) {
|
||||||
int maxAMContainers = bestContainers == null ?
|
|
||||||
Integer.MAX_VALUE : bestContainers.numAMContainers;
|
|
||||||
PreemptableContainers preemptableContainers =
|
PreemptableContainers preemptableContainers =
|
||||||
identifyContainersToPreemptOnNode(
|
identifyContainersToPreemptOnNode(
|
||||||
rr.getCapability(), node, maxAMContainers);
|
rr.getCapability(), node, maxAMContainers);
|
||||||
|
|
||||||
if (preemptableContainers != null) {
|
if (preemptableContainers != null) {
|
||||||
// This set is better than any previously identified set.
|
// This set is better than any previously identified set.
|
||||||
bestContainers = preemptableContainers;
|
bestContainers = preemptableContainers;
|
||||||
if (preemptableContainers.numAMContainers == 0) {
|
maxAMContainers = bestContainers.numAMContainers;
|
||||||
|
|
||||||
|
if (maxAMContainers == 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -184,13 +187,10 @@ class FSPreemptionThread extends Thread {
|
||||||
return preemptableContainers;
|
return preemptableContainers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean isNodeAlreadyReserved(
|
// Return null if the sum of all preemptable containers' resources
|
||||||
FSSchedulerNode node, FSAppAttempt app) {
|
// isn't enough to satisfy the starved request.
|
||||||
FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
|
return null;
|
||||||
return nodeReservedApp != null && !nodeReservedApp.equals(app);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void trackPreemptionsAgainstNode(List<RMContainer> containers,
|
private void trackPreemptionsAgainstNode(List<RMContainer> containers,
|
||||||
|
@ -216,7 +216,7 @@ class FSPreemptionThread extends Thread {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class PreemptContainersTask extends TimerTask {
|
private class PreemptContainersTask extends TimerTask {
|
||||||
private List<RMContainer> containers;
|
private final List<RMContainer> containers;
|
||||||
|
|
||||||
PreemptContainersTask(List<RMContainer> containers) {
|
PreemptContainersTask(List<RMContainer> containers) {
|
||||||
this.containers = containers;
|
this.containers = containers;
|
||||||
|
|
Loading…
Reference in New Issue