Merge -r 1428313:1428314 from trunk to branch-2. Fixes: YARN-192. Node update causes NPE in the fair scheduler. Contributed by Sandy Ryza
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1428316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b59f4e5e60
commit
1a7512987d
|
@ -138,6 +138,9 @@ Release 2.0.3-alpha - Unreleased
|
||||||
YARN-283. Fair scheduler fails to get queue info without root prefix.
|
YARN-283. Fair scheduler fails to get queue info without root prefix.
|
||||||
(sandyr via tucu)
|
(sandyr via tucu)
|
||||||
|
|
||||||
|
YARN-192. Node update causes NPE in the fair scheduler.
|
||||||
|
(Sandy Ryza via tomwhite)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
|
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
|
||||||
|
|
|
@ -18,6 +18,9 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -293,12 +296,16 @@ public class AppSchedulable extends Schedulable {
|
||||||
} else {
|
} else {
|
||||||
// If this app is over quota, don't schedule anything
|
// If this app is over quota, don't schedule anything
|
||||||
if (!(getRunnable())) { return Resources.none(); }
|
if (!(getRunnable())) { return Resources.none(); }
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Collection<Priority> prioritiesToTry = (reserved) ?
|
||||||
|
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
|
||||||
|
app.getPriorities();
|
||||||
|
|
||||||
// For each priority, see if we can schedule a node local, rack local
|
// For each priority, see if we can schedule a node local, rack local
|
||||||
// or off-switch request. Rack of off-switch requests may be delayed
|
// or off-switch request. Rack of off-switch requests may be delayed
|
||||||
// (not scheduled) in order to promote better locality.
|
// (not scheduled) in order to promote better locality.
|
||||||
for (Priority priority : app.getPriorities()) {
|
for (Priority priority : prioritiesToTry) {
|
||||||
app.addSchedulingOpportunity(priority);
|
app.addSchedulingOpportunity(priority);
|
||||||
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
|
NodeType allowedLocality = app.getAllowedLocalityLevel(priority,
|
||||||
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
|
scheduler.getNumClusterNodes(), scheduler.getNodeLocalityThreshold(),
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
|
@ -1187,4 +1189,58 @@ public class TestFairScheduler {
|
||||||
// Request should be fulfilled
|
// Request should be fulfilled
|
||||||
assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
|
assertEquals(2, scheduler.applications.get(attId1).getLiveContainers().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReservationWhileMultiplePriorities() {
|
||||||
|
// Add a node
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024));
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue1",
|
||||||
|
"user1", 1, 2);
|
||||||
|
scheduler.update();
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1,
|
||||||
|
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
FSSchedulerApp app = scheduler.applications.get(attId);
|
||||||
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
|
|
||||||
|
ContainerId containerId = scheduler.applications.get(attId)
|
||||||
|
.getLiveContainers().iterator().next().getContainerId();
|
||||||
|
|
||||||
|
// Cause reservation to be created
|
||||||
|
createSchedulingRequestExistingApplication(1024, 2, attId);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
|
|
||||||
|
// Create request at higher priority
|
||||||
|
createSchedulingRequestExistingApplication(1024, 1, attId);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
|
// Reserved container should still be at lower priority
|
||||||
|
for (RMContainer container : app.getReservedContainers()) {
|
||||||
|
assertEquals(2, container.getReservedPriority().getPriority());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Complete container
|
||||||
|
scheduler.allocate(attId, new ArrayList<ResourceRequest>(),
|
||||||
|
Arrays.asList(containerId));
|
||||||
|
|
||||||
|
// Schedule at opening
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Reserved container (at lower priority) should be run
|
||||||
|
Collection<RMContainer> liveContainers = app.getLiveContainers();
|
||||||
|
assertEquals(1, liveContainers.size());
|
||||||
|
for (RMContainer liveContainer : liveContainers) {
|
||||||
|
Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue