MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via bobby)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1361397 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
228736ab51
commit
17de78a43e
|
@ -685,6 +685,9 @@ Release 0.23.3 - UNRELEASED
|
||||||
MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all
|
MAPREDUCE-4419. ./mapred queue -info <queuename> -showJobs displays all
|
||||||
the jobs irrespective of <queuename> (Devaraj K via bobby)
|
the jobs irrespective of <queuename> (Devaraj K via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-4299. Terasort hangs with MR2 FifoScheduler (Tom White via
|
||||||
|
bobby)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -372,16 +372,20 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
application.setHeadroom(clusterResource);
|
|
||||||
|
|
||||||
LOG.debug("post-assignContainers");
|
LOG.debug("post-assignContainers");
|
||||||
application.showRequests();
|
application.showRequests();
|
||||||
|
|
||||||
// Done
|
// Done
|
||||||
if (Resources.lessThan(node.getAvailableResource(), minimumAllocation)) {
|
if (Resources.lessThan(node.getAvailableResource(), minimumAllocation)) {
|
||||||
return;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the applications' headroom to correctly take into
|
||||||
|
// account the containers assigned in this update.
|
||||||
|
for (SchedulerApp application : applications.values()) {
|
||||||
|
application.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int getMaxAllocatableContainers(SchedulerApp application,
|
private int getMaxAllocatableContainers(SchedulerApp application,
|
||||||
|
|
|
@ -19,26 +19,39 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.AMResponse;
|
import org.apache.hadoop.yarn.api.records.AMResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
@ -207,6 +220,68 @@ public class TestFifoScheduler {
|
||||||
|
|
||||||
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHeadroom() throws Exception {
|
||||||
|
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
FifoScheduler fs = (FifoScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// Add a node
|
||||||
|
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1);
|
||||||
|
fs.handle(new NodeAddedSchedulerEvent(n1));
|
||||||
|
|
||||||
|
// Add two applications
|
||||||
|
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
|
||||||
|
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
|
||||||
|
appId1, 1);
|
||||||
|
SchedulerEvent event1 = new AppAddedSchedulerEvent(appAttemptId1, "queue",
|
||||||
|
"user");
|
||||||
|
fs.handle(event1);
|
||||||
|
|
||||||
|
ApplicationId appId2 = BuilderUtils.newApplicationId(200, 2);
|
||||||
|
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
|
||||||
|
appId2, 1);
|
||||||
|
SchedulerEvent event2 = new AppAddedSchedulerEvent(appAttemptId2, "queue",
|
||||||
|
"user");
|
||||||
|
fs.handle(event2);
|
||||||
|
|
||||||
|
List<ContainerStatus> emptyStatus = new ArrayList<ContainerStatus>();
|
||||||
|
List<ContainerId> emptyId = new ArrayList<ContainerId>();
|
||||||
|
List<ResourceRequest> emptyAsk = new ArrayList<ResourceRequest>();
|
||||||
|
|
||||||
|
// Set up resource requests
|
||||||
|
|
||||||
|
// Ask for a 1 GB container for app 1
|
||||||
|
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
|
||||||
|
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
|
||||||
|
BuilderUtils.newResource(GB), 1));
|
||||||
|
fs.allocate(appAttemptId1, ask1, emptyId);
|
||||||
|
|
||||||
|
// Ask for a 2 GB container for app 2
|
||||||
|
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
|
||||||
|
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
|
||||||
|
BuilderUtils.newResource(2 * GB), 1));
|
||||||
|
fs.allocate(appAttemptId2, ask2, emptyId);
|
||||||
|
|
||||||
|
// Trigger container assignment
|
||||||
|
fs.handle(new NodeUpdateSchedulerEvent(n1, emptyStatus, emptyStatus));
|
||||||
|
|
||||||
|
// Get the allocation for the applications and verify headroom
|
||||||
|
Allocation allocation1 = fs.allocate(appAttemptId1, emptyAsk, emptyId);
|
||||||
|
Assert.assertEquals("Allocation headroom", 1 * GB,
|
||||||
|
allocation1.getResourceLimit().getMemory());
|
||||||
|
|
||||||
|
Allocation allocation2 = fs.allocate(appAttemptId2, emptyAsk, emptyId);
|
||||||
|
Assert.assertEquals("Allocation headroom", 1 * GB,
|
||||||
|
allocation2.getResourceLimit().getMemory());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestFifoScheduler t = new TestFifoScheduler();
|
TestFifoScheduler t = new TestFifoScheduler();
|
||||||
|
|
Loading…
Reference in New Issue