YARN-2797. TestWorkPreservingRMRestart should use

ParametrizedSchedulerTestBase. Contributed by Karthik Kambatla
This commit is contained in:
Xuan 2015-02-21 19:17:29 -08:00
parent e3d290244c
commit fe7a302473
3 changed files with 25 additions and 69 deletions

View File

@ -318,6 +318,9 @@ Release 2.7.0 - UNRELEASED
YARN-3236. Cleanup RMAuthenticationFilter#AUTH_HANDLER_PROPERTY.
(zhihai xu via xgong)
YARN-2797. TestWorkPreservingRMRestart should use ParametrizedSchedulerTestBase
(Karthik Kambatla via xgong)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -83,10 +83,22 @@ public abstract class ParameterizedSchedulerTestBase {
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
out.println(" <weight>1.0</weight>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
out.println("</queue>");
out.println("</allocations>");
out.close();
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
}
public SchedulerType getSchedulerType() {
return schedulerType;
}
}

View File

@ -97,23 +97,24 @@ import com.google.common.base.Supplier;
@SuppressWarnings({"rawtypes", "unchecked"})
@RunWith(value = Parameterized.class)
public class TestWorkPreservingRMRestart {
public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase {
private YarnConfiguration conf;
private Class<?> schedulerClass;
MockRM rm1 = null;
MockRM rm2 = null;
public TestWorkPreservingRMRestart(SchedulerType type) {
super(type);
}
@Before
public void setup() throws UnknownHostException {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
conf = new YarnConfiguration();
conf = getConf();
UserGroupInformation.setConfiguration(conf);
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerClass,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.setLong(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, 0);
DefaultMetricsSystem.setMiniClusterMode(true);
@ -129,16 +130,6 @@ public class TestWorkPreservingRMRestart {
}
}
@Parameterized.Parameters
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][] { { CapacityScheduler.class },
{ FifoScheduler.class }, {FairScheduler.class } });
}
public TestWorkPreservingRMRestart(Class<?> schedulerClass) {
this.schedulerClass = schedulerClass;
}
// Test common scheduler state including SchedulerAttempt, SchedulerNode,
// AppSchedulingInfo can be reconstructed via the container recovery reports
// on NM re-registration.
@ -159,9 +150,6 @@ public class TestWorkPreservingRMRestart {
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
if (schedulerClass.equals(FairScheduler.class)) {
initFairScheduler(rm1);
}
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
@ -174,9 +162,6 @@ public class TestWorkPreservingRMRestart {
// Re-start RM
rm2 = new MockRM(conf, memStore);
if (schedulerClass.equals(FairScheduler.class)) {
initFairScheduler(rm2);
}
rm2.start();
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// recover app
@ -249,11 +234,9 @@ public class TestWorkPreservingRMRestart {
SchedulerApplication schedulerApp =
schedulerApps.get(recoveredApp1.getApplicationId());
if (schedulerClass.equals(CapacityScheduler.class)) {
if (getSchedulerType() == SchedulerType.CAPACITY) {
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
} else if (schedulerClass.equals(FifoScheduler.class)) {
checkFifoQueue(rm2, schedulerApp, usedResources, availableResources);
} else if (schedulerClass.equals(FairScheduler.class)) {
} else {
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
}
@ -324,25 +307,6 @@ public class TestWorkPreservingRMRestart {
.getUsed());
}
private void checkFifoQueue(ResourceManager rm,
SchedulerApplication schedulerApp, Resource usedResources,
Resource availableResources) throws Exception {
FifoScheduler scheduler = (FifoScheduler) rm.getResourceScheduler();
// ************ check cluster used Resources ********
assertEquals(usedResources, scheduler.getUsedResource());
// ************ check app headroom ****************
SchedulerApplicationAttempt schedulerAttempt =
schedulerApp.getCurrentAppAttempt();
assertEquals(availableResources, schedulerAttempt.getHeadroom());
// ************ check queue metrics ****************
QueueMetrics queueMetrics = scheduler.getRootQueueMetrics();
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemory(),
availableResources.getVirtualCores(), usedResources.getMemory(),
usedResources.getVirtualCores());
}
private void checkFSQueue(ResourceManager rm,
SchedulerApplication schedulerApp, Resource usedResources,
Resource availableResources) throws Exception {
@ -379,29 +343,6 @@ public class TestWorkPreservingRMRestart {
usedResources.getVirtualCores());
}
private void initFairScheduler(ResourceManager rm) throws IOException {
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
String testDir =
new File(
System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
String allocFile = new File(testDir, "test-queues").getAbsolutePath();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
PrintWriter out = new PrintWriter(new FileWriter(allocFile));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
out.println(" <weight>1.0</weight>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
out.println("</queue>");
out.println("</allocations>");
out.close();
}
// create 3 container reports for AM
public static List<NMContainerStatus>
createNMContainerStatusForApp(MockAM am) {
@ -468,7 +409,7 @@ public class TestWorkPreservingRMRestart {
// 10. Assert each user's consumption inside the queue.
@Test (timeout = 30000)
public void testCapacitySchedulerRecovery() throws Exception {
if (!schedulerClass.equals(CapacityScheduler.class)) {
if (getSchedulerType() != SchedulerType.CAPACITY) {
return;
}
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
@ -587,7 +528,7 @@ public class TestWorkPreservingRMRestart {
//3. Verify that the expected exception was thrown
@Test (timeout = 30000, expected = QueueNotFoundException.class)
public void testCapacitySchedulerQueueRemovedRecovery() throws Exception {
if (!schedulerClass.equals(CapacityScheduler.class)) {
if (getSchedulerType() != SchedulerType.CAPACITY) {
throw new QueueNotFoundException("Dummy");
}
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);