YARN-8397. Potential thread leak in ActivitiesManager. Contributed by Rohith Sharma K S.
This commit is contained in:
parent
40f9b0c5c1
commit
6310c0d17d
@ -57,6 +57,7 @@ public class ActivitiesManager extends AbstractService {
|
|||||||
private Thread cleanUpThread;
|
private Thread cleanUpThread;
|
||||||
private int timeThreshold = 600 * 1000;
|
private int timeThreshold = 600 * 1000;
|
||||||
private final RMContext rmContext;
|
private final RMContext rmContext;
|
||||||
|
private volatile boolean stopped;
|
||||||
|
|
||||||
public ActivitiesManager(RMContext rmContext) {
|
public ActivitiesManager(RMContext rmContext) {
|
||||||
super(ActivitiesManager.class.getName());
|
super(ActivitiesManager.class.getName());
|
||||||
@ -113,7 +114,7 @@ protected void serviceStart() throws Exception {
|
|||||||
cleanUpThread = new Thread(new Runnable() {
|
cleanUpThread = new Thread(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while (true) {
|
while (!stopped && !Thread.currentThread().isInterrupted()) {
|
||||||
Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
|
Iterator<Map.Entry<NodeId, List<NodeAllocation>>> ite =
|
||||||
completedNodeAllocations.entrySet().iterator();
|
completedNodeAllocations.entrySet().iterator();
|
||||||
while (ite.hasNext()) {
|
while (ite.hasNext()) {
|
||||||
@ -140,20 +141,29 @@ public void run() {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
} catch (Exception e) {
|
} catch (InterruptedException e) {
|
||||||
// ignore
|
LOG.info(getName() + " thread interrupted");
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
cleanUpThread.setName("ActivitiesManager thread.");
|
||||||
cleanUpThread.start();
|
cleanUpThread.start();
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStop() throws Exception {
|
protected void serviceStop() throws Exception {
|
||||||
cleanUpThread.interrupt();
|
stopped = true;
|
||||||
|
if (cleanUpThread != null) {
|
||||||
|
cleanUpThread.interrupt();
|
||||||
|
try {
|
||||||
|
cleanUpThread.join();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
LOG.warn("Interrupted Exception while stopping", ie);
|
||||||
|
}
|
||||||
|
}
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -439,6 +439,7 @@ public void serviceStart() throws Exception {
|
|||||||
public void serviceStop() throws Exception {
|
public void serviceStop() throws Exception {
|
||||||
try {
|
try {
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
|
this.activitiesManager.stop();
|
||||||
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
|
if (scheduleAsynchronously && asyncSchedulerThreads != null) {
|
||||||
for (Thread t : asyncSchedulerThreads) {
|
for (Thread t : asyncSchedulerThreads) {
|
||||||
t.interrupt();
|
t.interrupt();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user