YARN-8373. RM Received RMFatalEvent of type CRITICAL_THREAD_CRASH. Contributed by Wilfred Spiegelenburg.

(cherry picked from commit ea68756c0c)
This commit is contained in:
Sunil G 2019-11-19 14:10:41 +05:30
parent 049279bb66
commit c1ec51696c
3 changed files with 18 additions and 17 deletions

View File

@ -33,12 +33,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -386,21 +386,21 @@ public class ClusterNodeTracker<N extends SchedulerNode> {
/** /**
* Convenience method to sort nodes. * Convenience method to sort nodes.
* Nodes can change while being sorted. Using a standard sort will fail
* without locking each node, the TreeSet handles this without locks.
* *
* Note that the sort is performed without holding a lock. We are sorting * @param comparator the comparator to sort the nodes with
* here instead of on the caller to allow for future optimizations (e.g. * @return sorted set of nodes in the form of a TreeSet
* sort once every x milliseconds).
*/ */
public List<N> sortedNodeList(Comparator<N> comparator) { public TreeSet<N> sortedNodeSet(Comparator<N> comparator) {
List<N> sortedList = null; TreeSet<N> sortedSet = new TreeSet<>(comparator);
readLock.lock(); readLock.lock();
try { try {
sortedList = new ArrayList(nodes.values()); sortedSet.addAll(nodes.values());
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
Collections.sort(sortedList, comparator); return sortedSet;
return sortedList;
} }
/** /**

View File

@ -111,6 +111,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@ -1036,15 +1037,17 @@ public class FairScheduler extends
@Deprecated @Deprecated
void continuousSchedulingAttempt() throws InterruptedException { void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime(); long start = getClock().getTime();
List<FSSchedulerNode> nodeIdList; TreeSet<FSSchedulerNode> nodeIdSet;
// Hold a lock to prevent comparator order changes due to changes of node // Hold a lock to prevent node changes as much as possible.
// unallocated resources readLock.lock();
synchronized (this) { try {
nodeIdList = nodeTracker.sortedNodeList(nodeAvailableResourceComparator); nodeIdSet = nodeTracker.sortedNodeSet(nodeAvailableResourceComparator);
} finally {
readLock.unlock();
} }
// iterate all nodes // iterate all nodes
for (FSSchedulerNode node : nodeIdList) { for (FSSchedulerNode node : nodeIdSet) {
try { try {
if (Resources.fitsIn(minimumAllocation, if (Resources.fitsIn(minimumAllocation,
node.getUnallocatedResource())) { node.getUnallocatedResource())) {

View File

@ -323,12 +323,10 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
for (int j = 0; j < 100; j++) { for (int j = 0; j < 100; j++) {
for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) { for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) {
int i = ThreadLocalRandom.current().nextInt(-30, 30); int i = ThreadLocalRandom.current().nextInt(-30, 30);
synchronized (scheduler) {
node.deductUnallocatedResource(Resource.newInstance(i * 1024, i)); node.deductUnallocatedResource(Resource.newInstance(i * 1024, i));
} }
} }
} }
}
}.start(); }.start();
try { try {