YARN-6448. Continuous scheduling thread crashes while sorting nodes. (Yufei Gu via kasha)

This commit is contained in:
Karthik Kambatla 2017-04-05 15:42:55 -07:00
parent 3db8d68d63
commit b4c4f36594
3 changed files with 45 additions and 3 deletions

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -286,7 +287,8 @@ public abstract class SchedulerNode {
* container. * container.
* @param resource Resources to deduct. * @param resource Resources to deduct.
*/ */
private synchronized void deductUnallocatedResource(Resource resource) { @VisibleForTesting
public synchronized void deductUnallocatedResource(Resource resource) {
if (resource == null) { if (resource == null) {
LOG.error("Invalid deduction of null resource for " LOG.error("Invalid deduction of null resource for "
+ rmNode.getNodeAddress()); + rmNode.getNodeAddress());

View File

@ -913,8 +913,12 @@ public class FairScheduler extends
void continuousSchedulingAttempt() throws InterruptedException { void continuousSchedulingAttempt() throws InterruptedException {
long start = getClock().getTime(); long start = getClock().getTime();
List<FSSchedulerNode> nodeIdList = List<FSSchedulerNode> nodeIdList;
nodeTracker.sortedNodeList(nodeAvailableResourceComparator); // Hold a lock to prevent comparator order changes due to changes of node
// unallocated resources
synchronized (this) {
nodeIdList = nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
}
// iterate all nodes // iterate all nodes
for (FSSchedulerNode node : nodeIdList) { for (FSSchedulerNode node : nodeIdList) {

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -30,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ClusterNodeTracker;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -57,6 +59,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
public class TestContinuousScheduling extends FairSchedulerTestBase { public class TestContinuousScheduling extends FairSchedulerTestBase {
private ControlledClock mockClock; private ControlledClock mockClock;
@ -302,6 +305,39 @@ public class TestContinuousScheduling extends FairSchedulerTestBase {
assertNotEquals("One of the threads is still alive", 0, numRetries); assertNotEquals("One of the threads is still alive", 0, numRetries);
} }
@Test
public void TestNodeAvailableResourceComparatorTransitivity() {
ClusterNodeTracker<FSSchedulerNode> clusterNodeTracker =
scheduler.getNodeTracker();
List<RMNode> rmNodes =
MockNodes.newNodes(2, 4000, Resource.newInstance(4096, 4));
for (RMNode rmNode : rmNodes) {
clusterNodeTracker.addNode(new FSSchedulerNode(rmNode, false));
}
// To simulate unallocated resource changes
new Thread() {
@Override
public void run() {
for (int j = 0; j < 100; j++) {
for (FSSchedulerNode node : clusterNodeTracker.getAllNodes()) {
int i = ThreadLocalRandom.current().nextInt(-30, 30);
synchronized (scheduler) {
node.deductUnallocatedResource(Resource.newInstance(i * 1024, i));
}
}
}
}
}.start();
try {
scheduler.continuousSchedulingAttempt();
} catch (Exception e) {
fail(e.getMessage());
}
}
@Test @Test
public void testFairSchedulerContinuousSchedulingInitTime() throws Exception { public void testFairSchedulerContinuousSchedulingInitTime() throws Exception {
scheduler.start(); scheduler.start();