YARN-5106. Provide a builder interface for FairScheduler allocations for use in tests. Contributed by Adam Antal

This commit is contained in:
Szilard Nemeth 2019-12-05 17:37:40 +01:00
parent 83a14559e5
commit 520fe2c99b
29 changed files with 1612 additions and 2000 deletions

View File

@ -45,6 +45,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert; import org.junit.Assert;
@ -54,9 +59,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -136,25 +139,16 @@ public class TestYarnClientWithReservation {
private Configuration configureReservationForFairScheduler() { private Configuration configureReservationForFairScheduler() {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
try { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); .drfDefaultQueueSchedulingPolicy()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("default").build())
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("dedicated")
out.println(" <queue name=\"default\"></queue>"); .reservation()
out.println(" <queue name=\"dedicated\">"); .weight(10)
out.println(" <reservation></reservation>"); .build())
// set weight to 10 to make sure this queue get enough steady fair share .build())
out.println(" <weight>10</weight>"); .writeToFile(FS_ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
} catch (IOException e) {
Assert.fail(e.getMessage());
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
return conf; return conf;

View File

@ -26,14 +26,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.junit.After; import org.junit.After;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,43 +77,40 @@ public abstract class ParameterizedSchedulerTestBase {
schedulerType = type; schedulerType = type;
switch (schedulerType) { switch (schedulerType) {
case FAIR: case FAIR:
configureFairScheduler(conf); configureFairScheduler(conf);
scheduler = new FairScheduler(); scheduler = new FairScheduler();
conf.set(YarnConfiguration.RM_SCHEDULER, conf.set(YarnConfiguration.RM_SCHEDULER,
FairScheduler.class.getName()); FairScheduler.class.getName());
break; break;
case CAPACITY: case CAPACITY:
scheduler = new CapacityScheduler(); scheduler = new CapacityScheduler();
((CapacityScheduler)scheduler).setConf(conf); ((CapacityScheduler)scheduler).setConf(conf);
conf.set(YarnConfiguration.RM_SCHEDULER, conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName()); CapacityScheduler.class.getName());
break; break;
default: default:
throw new IllegalArgumentException("Invalid type: " + type); throw new IllegalArgumentException("Invalid type: " + type);
} }
} }
protected void configureFairScheduler(YarnConfiguration conf) protected void configureFairScheduler(YarnConfiguration configuration) {
throws IOException {
// Disable queueMaxAMShare limitation for fair scheduler // Disable queueMaxAMShare limitation for fair scheduler
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .fairDefaultQueueSchedulingPolicy()
out.println("<allocations>"); .disableQueueMaxAMShareDefault()
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>"); .schedulingPolicy("drf")
out.println("<queue name=\"root\">"); .weight(1.0f)
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .fairSharePreemptionTimeout(100)
out.println(" <weight>1.0</weight>"); .minSharePreemptionTimeout(120)
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>"); .fairSharePreemptionThreshold(.5)
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>"); .build())
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>"); .writeToFile(FS_ALLOC_FILE);
out.println("</queue>");
out.println("</allocations>");
out.close();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); configuration.set(FairSchedulerConfiguration.ALLOCATION_FILE,
conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10); FS_ALLOC_FILE);
configuration.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
} }
@After @After

View File

@ -19,9 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -53,6 +51,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -552,49 +556,35 @@ public class ReservationACLsTestBase extends ACLsTestBase {
return csConf; return csConf;
} }
private static Configuration createFairSchedulerConfiguration() throws private static Configuration createFairSchedulerConfiguration() {
IOException {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
final String TEST_DIR = new File(System.getProperty("test.build.data", final String testDir = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath(); "/tmp")).getAbsolutePath();
final String ALLOC_FILE = new File(TEST_DIR, "test-queues.xml") final String allocFile = new File(testDir, "test-queues.xml")
.getAbsolutePath(); .getAbsolutePath();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); AllocationFileWriter.create()
out.println("<allocations>"); .drfDefaultQueueSchedulingPolicy()
out.println(" <defaultQueueSchedulingPolicy>drf" + .addQueue(new AllocationFileQueue.Builder("queueA")
"</defaultQueueSchedulingPolicy>"); .aclSubmitReservations("queueA_user,common_user ")
out.println(" <queue name=\"queueA\">"); .aclAdministerReservations("queueA_admin ")
out.println(" <aclSubmitReservations>" + .aclListReservations("common_user ")
"queueA_user,common_user " + .aclSubmitApps("queueA_user,common_user ")
"</aclSubmitReservations>"); .aclAdministerApps("queueA_admin ")
out.println(" <aclAdministerReservations>" + .reservation().build())
"queueA_admin " + .addQueue(new AllocationFileQueue.Builder("queueB")
"</aclAdministerReservations>"); .aclSubmitReservations("queueB_user,common_user ")
out.println(" <aclListReservations>common_user </aclListReservations>"); .aclAdministerReservations("queueB_admin ")
out.println(" <aclSubmitApps>queueA_user,common_user </aclSubmitApps>"); .aclListReservations("common_user ")
out.println(" <aclAdministerApps>queueA_admin </aclAdministerApps>"); .aclSubmitApps("queueB_user,common_user ")
out.println(" <reservation> </reservation>"); .aclAdministerApps("queueB_admin ")
out.println(" </queue>"); .reservation().build())
out.println(" <queue name=\"queueB\">"); .addQueue(new AllocationFileQueue.Builder("queueC")
out.println(" <aclSubmitApps>queueB_user,common_user </aclSubmitApps>"); .reservation().build())
out.println(" <aclAdministerApps>queueB_admin </aclAdministerApps>"); .writeToFile(allocFile);
out.println(" <aclSubmitReservations>" +
"queueB_user,common_user " + fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
"</aclSubmitReservations>");
out.println(" <aclAdministerReservations>" +
"queueB_admin " +
"</aclAdministerReservations>");
out.println(" <aclListReservations>common_user </aclListReservations>");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println(" <queue name=\"queueC\">");
out.println(" <reservation> </reservation>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
fsConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true); fsConf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);

View File

@ -25,9 +25,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections; import java.util.Collections;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
@ -47,6 +45,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
@ -73,13 +77,9 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
// Basic config with one queue (override in test if needed) // Basic config with one queue (override in test if needed)
PrintWriter out = new PrintWriter(new FileWriter(allocFileName)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("test").build())
out.println("<allocations>"); .writeToFile(allocFileName);
out.println(" <queue name=\"test\">");
out.println(" </queue>");
out.println("</allocations>");
out.close();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class); ResourceScheduler.class);
@ -111,19 +111,15 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
// scheduler config with a limited queue // scheduler config with a limited queue
PrintWriter out = new PrintWriter(new FileWriter(allocFileName)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("limited")
out.println(" <queue name=\"root\">"); .maxContainerAllocation(maxAlloc + " mb 1 vcores")
out.println(" <queue name=\"limited\">"); .build())
out.println(" <maxContainerAllocation>" + maxAlloc + " mb 1 vcores"); .subQueue(new AllocationFileQueue.Builder("unlimited")
out.println(" </maxContainerAllocation>"); .build())
out.println(" </queue>"); .build())
out.println(" <queue name=\"unlimited\">"); .writeToFile(allocFileName);
out.println(" </queue>");
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext); rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1); ApplicationId appId = MockApps.newAppID(1);
@ -153,25 +149,22 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true"); conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .aclSubmitApps(" ")
out.println(" <queue name=\"root\">"); .aclAdministerApps(" ")
out.println(" <aclSubmitApps> </aclSubmitApps>"); .subQueue(new AllocationFileQueue.Builder("noaccess")
out.println(" <aclAdministerApps> </aclAdministerApps>"); .build())
out.println(" <queue name=\"noaccess\">"); .subQueue(new AllocationFileQueue.Builder("submitonly")
out.println(" </queue>"); .aclSubmitApps("test ")
out.println(" <queue name=\"submitonly\">"); .aclAdministerApps(" ")
out.println(" <aclSubmitApps>test </aclSubmitApps>"); .build())
out.println(" <aclAdministerApps> </aclAdministerApps>"); .subQueue(new AllocationFileQueue.Builder("adminonly")
out.println(" </queue>"); .aclSubmitApps(" ")
out.println(" <queue name=\"adminonly\">"); .aclAdministerApps("test ")
out.println(" <aclSubmitApps> </aclSubmitApps>"); .build())
out.println(" <aclAdministerApps>test </aclAdministerApps>"); .build())
out.println(" </queue>"); .writeToFile(allocFileName);
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext); rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1); ApplicationId appId = MockApps.newAppID(1);
@ -206,17 +199,14 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true"); conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("noaccess")
out.println(" <queue name=\"root\">"); .aclSubmitApps(" ")
out.println(" <queue name=\"noaccess\">"); .aclAdministerApps(" ")
out.println(" <aclSubmitApps> </aclSubmitApps>"); .build())
out.println(" <aclAdministerApps> </aclAdministerApps>"); .build())
out.println(" </queue>"); .writeToFile(allocFileName);
out.println(" </queue>");
out.println("</allocations>");
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext); rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1); ApplicationId appId = MockApps.newAppID(1);
@ -235,20 +225,19 @@ public class TestAppManagerWithFairScheduler extends AppManagerTestBase {
conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true"); conf.set(YarnConfiguration.YARN_ACL_ENABLE, "true");
PrintWriter out = new PrintWriter(new FileWriter(allocFileName)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .aclSubmitApps(" ")
out.println(" <queue name=\"root\">"); .aclAdministerApps(" ")
out.println(" <aclSubmitApps> </aclSubmitApps>"); .subQueue(new AllocationFileQueue.Builder("noaccess")
out.println(" <aclAdministerApps> </aclAdministerApps>"); .parent(true)
out.println(" <queue name=\"noaccess\" type=\"parent\">"); .build())
out.println(" </queue>"); .subQueue(new AllocationFileQueue.Builder("submitonly")
out.println(" <queue name=\"submitonly\" type=\"parent\">"); .parent(true)
out.println(" <aclSubmitApps>test </aclSubmitApps>"); .aclSubmitApps("test ")
out.println(" </queue>"); .build())
out.println(" </queue>"); .build())
out.println("</allocations>"); .writeToFile(allocFileName);
out.close();
rmContext.getScheduler().reinitialize(conf, rmContext); rmContext.getScheduler().reinitialize(conf, rmContext);
ApplicationId appId = MockApps.newAppID(1); ApplicationId appId = MockApps.newAppID(1);

View File

@ -23,9 +23,7 @@ import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.HashMap; import java.util.HashMap;
@ -35,6 +33,16 @@ import java.util.Map;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueuePlacementPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueuePlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -181,26 +189,25 @@ public class TestApplicationACLs extends ParameterizedSchedulerTestBase {
} }
@Override @Override
protected void configureFairScheduler(YarnConfiguration conf) protected void configureFairScheduler(YarnConfiguration configuration) {
throws IOException {
final String testDir = new File(System.getProperty("test.build.data", final String testDir = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath(); "/tmp")).getAbsolutePath();
final String allocFile = new File(testDir, "test-queues.xml") final String allocFile = new File(testDir, "test-queues.xml")
.getAbsolutePath(); .getAbsolutePath();
PrintWriter out = new PrintWriter(new FileWriter(allocFile));
out.println("<?xml version=\"1.0\"?>"); AllocationFileWriter.create()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"root\" >"); .subQueue(new AllocationFileQueue.Builder("default").build())
out.println(" <queue name=\"default\">"); .build())
out.println(" </queue>"); .queuePlacementPolicy(new AllocationFileQueuePlacementPolicy()
out.println("</queue>"); .addRule(new AllocationFileQueuePlacementRule(
out.println("<queuePlacementPolicy>"); AllocationFileQueuePlacementRule.RuleName.SPECIFIED)
out.println(" <rule name=\"specified\" create=\"false\" />"); .create(false))
out.println(" <rule name=\"reject\" />"); .addRule(new AllocationFileQueuePlacementRule(
out.println("</queuePlacementPolicy>"); AllocationFileQueuePlacementRule.RuleName.REJECT)))
out.println("</allocations>"); .writeToFile(allocFile);
out.close();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile); configuration.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
} }
@Test @Test

View File

@ -24,9 +24,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -56,6 +54,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeSortingManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
@ -104,62 +107,46 @@ public class ReservationSystemTestUtil {
.assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy); .assertTrue(plan.getSharingPolicy() instanceof CapacityOverTimePolicy);
} }
public static void setupFSAllocationFile(String allocationFile) public static void setupFSAllocationFile(String allocationFile) {
throws IOException { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); .drfDefaultQueueSchedulingPolicy()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("default")
out.println("<allocations>"); .weight(1).build())
out.println("<queue name=\"default\">"); .addQueue(new AllocationFileQueue.Builder("a")
out.println("<weight>1</weight>"); .weight(1)
out.println("</queue>"); .subQueue(new AllocationFileQueue.Builder("a1")
out.println("<queue name=\"a\">"); .weight(3).build())
out.println("<weight>1</weight>"); .subQueue(new AllocationFileQueue.Builder("a2")
out.println("<queue name=\"a1\">"); .weight(7).build())
out.println("<weight>3</weight>"); .build())
out.println("</queue>"); .addQueue(new AllocationFileQueue.Builder("dedicated")
out.println("<queue name=\"a2\">"); .weight(8)
out.println("<weight>7</weight>"); .reservation()
out.println("</queue>"); .build())
out.println("</queue>"); .writeToFile(allocationFile);
out.println("<queue name=\"dedicated\">");
out.println("<reservation></reservation>");
out.println("<weight>8</weight>");
out.println("</queue>");
out.println(
"<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
} }
public static void updateFSAllocationFile(String allocationFile) public static void updateFSAllocationFile(String allocationFile) {
throws IOException { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); .drfDefaultQueueSchedulingPolicy()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("default")
out.println("<allocations>"); .weight(5).build())
out.println("<queue name=\"default\">"); .addQueue(new AllocationFileQueue.Builder("a")
out.println("<weight>5</weight>"); .weight(5)
out.println("</queue>"); .subQueue(new AllocationFileQueue.Builder("a1")
out.println("<queue name=\"a\">"); .weight(3).build())
out.println("<weight>5</weight>"); .subQueue(new AllocationFileQueue.Builder("a2")
out.println("<queue name=\"a1\">"); .weight(7).build())
out.println("<weight>3</weight>"); .build())
out.println("</queue>"); .addQueue(new AllocationFileQueue.Builder("dedicated")
out.println("<queue name=\"a2\">"); .weight(10)
out.println("<weight>7</weight>"); .reservation()
out.println("</queue>"); .build())
out.println("</queue>"); .addQueue(new AllocationFileQueue.Builder("reservation")
out.println("<queue name=\"dedicated\">"); .weight(80)
out.println("<reservation></reservation>"); .reservation()
out.println("<weight>10</weight>"); .build())
out.println("</queue>"); .writeToFile(allocationFile);
out.println("<queue name=\"reservation\">");
out.println("<reservation></reservation>");
out.println("<weight>80</weight>");
out.println("</queue>");
out.println(
"<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
} }
public static FairScheduler setupFairScheduler(RMContext rmContext, public static FairScheduler setupFairScheduler(RMContext rmContext,

View File

@ -34,24 +34,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlace
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueuePlacementPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueuePlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.UserSettings;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider; import org.apache.hadoop.yarn.util.resource.CustomResourceTypesConfigurationProvider;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.net.URL; import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -97,6 +99,11 @@ public class TestAllocationFileLoaderService {
when(scheduler.getRMContext()).thenReturn(rmContext); when(scheduler.getRMContext()).thenReturn(rmContext);
} }
@After
public void teardown() {
new File(ALLOC_FILE).delete();
}
@Test @Test
public void testGetAllocationFileFromFileSystem() public void testGetAllocationFileFromFileSystem()
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
@ -152,18 +159,14 @@ public class TestAllocationFileLoaderService {
@Test (timeout = 10000) @Test (timeout = 10000)
public void testReload() throws Exception { public void testReload() throws Exception {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("queueA")
out.println("<allocations>"); .maxRunningApps(1).build())
out.println(" <queue name=\"queueA\">"); .addQueue(new AllocationFileQueue.Builder("queueB").build())
out.println(" <maxRunningApps>1</maxRunningApps>"); .queuePlacementPolicy(new AllocationFileQueuePlacementPolicy()
out.println(" </queue>"); .addRule(new AllocationFileQueuePlacementRule(
out.println(" <queue name=\"queueB\" />"); AllocationFileQueuePlacementRule.RuleName.DEFAULT)))
out.println(" <queuePlacementPolicy>"); .writeToFile(ALLOC_FILE);
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
ControlledClock clock = new ControlledClock(); ControlledClock clock = new ControlledClock();
clock.setTime(0); clock.setTime(0);
@ -195,20 +198,17 @@ public class TestAllocationFileLoaderService {
confHolder.allocConf = null; confHolder.allocConf = null;
// Modify file and advance the clock // Modify file and advance the clock
out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("queueB")
out.println("<allocations>"); .maxRunningApps(3).build())
out.println(" <queue name=\"queueB\">"); .queuePlacementPolicy(new AllocationFileQueuePlacementPolicy()
out.println(" <maxRunningApps>3</maxRunningApps>"); .addRule(new AllocationFileQueuePlacementRule(
out.println(" </queue>"); AllocationFileQueuePlacementRule.RuleName.SPECIFIED))
out.println(" <queuePlacementPolicy>"); .addRule(new AllocationFileQueuePlacementRule(
out.println(" <rule name='specified' />"); AllocationFileQueuePlacementRule.RuleName.NESTED)
out.println(" <rule name='nestedUserQueue' >"); .addNestedRule(new AllocationFileQueuePlacementRule(
out.println(" <rule name='primaryGroup' />"); AllocationFileQueuePlacementRule.RuleName.PRIMARY_GROUP))))
out.println(" </rule>"); .writeToFile(ALLOC_FILE);
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
clock.tickMsec(System.currentTimeMillis() clock.tickMsec(System.currentTimeMillis()
+ AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000); + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000);
@ -242,57 +242,56 @@ public class TestAllocationFileLoaderService {
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
AllocationFileWriter AllocationFileWriter.create()
.create()
// Give queue A a minimum of 1024 M // Give queue A a minimum of 1024 M
.queue("queueA") .addQueue(new AllocationFileQueue.Builder("queueA")
.minResources("1024mb,0vcores") .minResources("1024mb,0vcores")
.maxResources("2048mb,10vcores") .maxResources("2048mb,10vcores")
.buildQueue() .build())
// Give queue B a minimum of 2048 M // Give queue B a minimum of 2048 M
.queue("queueB") .addQueue(new AllocationFileQueue.Builder("queueB")
.minResources("2048mb,0vcores") .minResources("2048mb,0vcores")
.maxResources("5120mb,110vcores") .maxResources("5120mb,110vcores")
.aclAdministerApps("alice,bob admins") .aclAdministerApps("alice,bob admins")
.schedulingPolicy("fair") .schedulingPolicy("fair")
.buildQueue() .build())
// Give queue C no minimum // Give queue C no minimum
.queue("queueC") .addQueue(new AllocationFileQueue.Builder("queueC")
.minResources("5120mb,0vcores") .minResources("5120mb,0vcores")
.aclSubmitApps("alice,bob admins") .aclSubmitApps("alice,bob admins")
.buildQueue() .build())
// Give queue D a limit of 3 running apps and 0.4f maxAMShare // Give queue D a limit of 3 running apps and 0.4f maxAMShare
.queue("queueD") .addQueue(new AllocationFileQueue.Builder("queueD")
.maxRunningApps(3) .maxRunningApps(3)
.maxAMShare(0.4) .maxAMShare(0.4)
.buildQueue() .build())
// Give queue E a preemption timeout of one minute // Give queue E a preemption timeout of one minute
.queue("queueE") .addQueue(new AllocationFileQueue.Builder("queueE")
.minSharePreemptionTimeout(60) .minSharePreemptionTimeout(60)
.buildQueue() .build())
// Make queue F a parent queue without configured leaf queues // Make queue F a parent queue without configured leaf queues
// using the 'type' attribute // using the 'type' attribute
.queue("queueF") .addQueue(new AllocationFileQueue.Builder("queueF")
.parent(true) .parent(true)
.maxChildResources("2048mb,64vcores") .maxChildResources("2048mb,64vcores")
.buildQueue() .build())
.queue("queueG") .addQueue(new AllocationFileQueue.Builder("queueG")
.maxChildResources("2048mb,64vcores") .maxChildResources("2048mb,64vcores")
.fairSharePreemptionTimeout(120) .fairSharePreemptionTimeout(120)
.minSharePreemptionTimeout(50) .minSharePreemptionTimeout(50)
.fairSharePreemptionThreshold(0.6) .fairSharePreemptionThreshold(0.6)
.maxContainerAllocation( .maxContainerAllocation(
"vcores=16, memory-mb=512, " + A_CUSTOM_RESOURCE + "=10") "vcores=16, memory-mb=512, " + A_CUSTOM_RESOURCE + "=10")
// Create hierarchical queues G,H, with different min/fair // Create hierarchical queues G,H, with different min/fair
// share preemption timeouts and preemption thresholds. // share preemption timeouts and preemption thresholds.
// Also add a child default to make sure it doesn't impact queue H. // Also add a child default to make sure it doesn't impact queue H.
.subQueue("queueH") .subQueue(new AllocationFileQueue.Builder("queueH")
.fairSharePreemptionTimeout(180) .fairSharePreemptionTimeout(180)
.minSharePreemptionTimeout(40) .minSharePreemptionTimeout(40)
.fairSharePreemptionThreshold(0.7) .fairSharePreemptionThreshold(0.7)
.maxContainerAllocation("1024mb,8vcores") .maxContainerAllocation("1024mb,8vcores")
.buildSubQueue() .build())
.buildQueue() .build())
// Set default limit of apps per queue to 15 // Set default limit of apps per queue to 15
.queueMaxAppsDefault(15) .queueMaxAppsDefault(15)
// Set default limit of max resource per queue to 4G and 100 cores // Set default limit of max resource per queue to 4G and 100 cores
@ -308,11 +307,11 @@ public class TestAllocationFileLoaderService {
// Set default fair share preemption threshold to 0.4 // Set default fair share preemption threshold to 0.4
.defaultFairSharePreemptionThreshold(0.4) .defaultFairSharePreemptionThreshold(0.4)
// Set default scheduling policy to DRF // Set default scheduling policy to DRF
.defaultQueueSchedulingPolicy("drf") .drfDefaultQueueSchedulingPolicy()
// Give user1 a limit of 10 jobs // Give user1 a limit of 10 jobs
.userSettings("user1") .userSettings(new UserSettings.Builder("user1")
.maxRunningApps(10) .maxRunningApps(10)
.build() .build())
.writeToFile(ALLOC_FILE); .writeToFile(ALLOC_FILE);
allocLoader.init(conf); allocLoader.init(conf);
@ -488,48 +487,51 @@ public class TestAllocationFileLoaderService {
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .useLegacyTagNameForQueues()
out.println("<allocations>"); // Give queue A a minimum of 1024 M
// Give queue A a minimum of 1024 M .addQueue(new AllocationFileQueue.Builder("queueA")
out.println("<pool name=\"queueA\">"); .minResources("1024mb,0vcores")
out.println("<minResources>1024mb,0vcores</minResources>"); .build())
out.println("</pool>"); // Give queue B a minimum of 2048 M
// Give queue B a minimum of 2048 M .addQueue(new AllocationFileQueue.Builder("queueB")
out.println("<pool name=\"queueB\">"); .minResources("2048mb,0vcores")
out.println("<minResources>2048mb,0vcores</minResources>"); .aclAdministerApps("alice,bob admins")
out.println("<aclAdministerApps>alice,bob admins</aclAdministerApps>"); .build())
out.println("</pool>"); // Give queue C no minimum
// Give queue C no minimum .addQueue(new AllocationFileQueue.Builder("queueC")
out.println("<pool name=\"queueC\">"); .aclAdministerApps("alice,bob admins")
out.println("<aclSubmitApps>alice,bob admins</aclSubmitApps>"); .build())
out.println("</pool>"); // Give queue D a limit of 3 running apps
// Give queue D a limit of 3 running apps .addQueue(new AllocationFileQueue.Builder("queueD")
out.println("<pool name=\"queueD\">"); .maxRunningApps(3)
out.println("<maxRunningApps>3</maxRunningApps>"); .build())
out.println("</pool>"); // Give queue E a preemption timeout of one minute and 0.3f threshold
// Give queue E a preemption timeout of one minute and 0.3f threshold .addQueue(new AllocationFileQueue.Builder("queueE")
out.println("<pool name=\"queueE\">"); .minSharePreemptionTimeout(60)
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); .fairSharePreemptionThreshold(0.3)
out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>"); .build())
out.println("</pool>"); // Set default limit of apps per queue to 15
// Set default limit of apps per queue to 15 .queueMaxAppsDefault(15)
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); // Set default limit of apps per user to 5
// Set default limit of apps per user to 5 .userMaxAppsDefault(5)
out.println("<userMaxAppsDefault>5</userMaxAppsDefault>"); // Set default limit of max resource per queue to 4G and 100 cores
// Give user1 a limit of 10 jobs .queueMaxResourcesDefault("4096mb,100vcores")
out.println("<user name=\"user1\">"); // Set default limit of AMResourceShare to 0.5f
out.println("<maxRunningApps>10</maxRunningApps>"); .queueMaxAMShareDefault(0.5)
out.println("</user>"); // Set default min share preemption timeout to 2 minutes
// Set default min share preemption timeout to 2 minutes .defaultMinSharePreemptionTimeout(120)
out.println("<defaultMinSharePreemptionTimeout>120" // Set default fair share preemption timeout to 5 minutes
+ "</defaultMinSharePreemptionTimeout>"); .defaultFairSharePreemptionTimeout(300)
// Set fair share preemption timeout to 5 minutes // Set default fair share preemption threshold to 0.6
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); .defaultFairSharePreemptionThreshold(0.6)
// Set default fair share preemption threshold to 0.6f // Set default scheduling policy to DRF
out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>"); .drfDefaultQueueSchedulingPolicy()
out.println("</allocations>"); // Give user1 a limit of 10 jobs
out.close(); .userSettings(new UserSettings.Builder("user1")
.maxRunningApps(10)
.build())
.writeToFile(ALLOC_FILE);
allocLoader.init(conf); allocLoader.init(conf);
ReloadListener confHolder = new ReloadListener(); ReloadListener confHolder = new ReloadListener();
@ -602,11 +604,7 @@ public class TestAllocationFileLoaderService {
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create().writeToFile(ALLOC_FILE);
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -632,15 +630,10 @@ public class TestAllocationFileLoaderService {
public void testQueueAlongsideRoot() throws Exception { public void testQueueAlongsideRoot() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root").build())
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("other").build())
out.println("<queue name=\"root\">"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("<queue name=\"other\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -658,13 +651,9 @@ public class TestAllocationFileLoaderService {
public void testQueueNameContainingPeriods() throws Exception { public void testQueueNameContainingPeriods() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("parent1.child").build())
out.println("<allocations>"); .writeToFile(ALLOC_FILE);
out.println("<queue name=\"parent1.child1\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -682,13 +671,9 @@ public class TestAllocationFileLoaderService {
public void testQueueNameContainingOnlyWhitespace() throws Exception { public void testQueueNameContainingOnlyWhitespace() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder(" ").build())
out.println("<allocations>"); .writeToFile(ALLOC_FILE);
out.println("<queue name=\" \">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -702,15 +687,12 @@ public class TestAllocationFileLoaderService {
public void testParentTagWithReservation() throws Exception { public void testParentTagWithReservation() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("parent")
out.println("<allocations>"); .parent(true)
out.println("<queue name=\"parent\" type=\"parent\">"); .reservation()
out.println("<reservation>"); .build())
out.println("</reservation>"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -731,17 +713,13 @@ public class TestAllocationFileLoaderService {
public void testParentWithReservation() throws Exception { public void testParentWithReservation() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("parent")
out.println("<allocations>"); .parent(true)
out.println("<queue name=\"parent\">"); .subQueue(new AllocationFileQueue.Builder("child").build())
out.println("<reservation>"); .reservation()
out.println("</reservation>"); .build())
out.println(" <queue name=\"child\">"); .writeToFile(ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -762,15 +740,12 @@ public class TestAllocationFileLoaderService {
public void testParentTagWithChild() throws Exception { public void testParentTagWithChild() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("parent")
out.println("<allocations>"); .parent(true)
out.println("<queue name=\"parent\" type=\"parent\">"); .subQueue(new AllocationFileQueue.Builder("child").build())
out.println(" <queue name=\"child\">"); .build())
out.println(" </queue>"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -794,14 +769,9 @@ public class TestAllocationFileLoaderService {
public void testQueueNameContainingNBWhitespace() throws Exception { public void testQueueNameContainingNBWhitespace() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new OutputStreamWriter( AllocationFileWriter.create()
new FileOutputStream(ALLOC_FILE), StandardCharsets.UTF_8)); .addQueue(new AllocationFileQueue.Builder("\u00a0").build())
out.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>"); .writeToFile(ALLOC_FILE);
out.println("<allocations>");
out.println("<queue name=\"\u00a0\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -818,13 +788,9 @@ public class TestAllocationFileLoaderService {
public void testDefaultQueueSchedulingModeIsFIFO() throws Exception { public void testDefaultQueueSchedulingModeIsFIFO() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .fifoDefaultQueueSchedulingPolicy()
out.println("<allocations>"); .writeToFile(ALLOC_FILE);
out.println("<defaultQueueSchedulingPolicy>fifo" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -838,19 +804,14 @@ public class TestAllocationFileLoaderService {
public void testReservableQueue() throws Exception { public void testReservableQueue() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("reservable")
out.println("<allocations>"); .reservation()
out.println("<queue name=\"reservable\">"); .build())
out.println("<reservation>"); .addQueue(new AllocationFileQueue.Builder("other").build())
out.println("</reservation>"); .reservationAgent("DummyAgentName")
out.println("</queue>"); .reservationPolicy("AnyAdmissionPolicy")
out.println("<queue name=\"other\">"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("<reservation-agent>DummyAgentName</reservation-agent>");
out.println("<reservation-policy>AnyAdmissionPolicy</reservation-policy>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);
@ -900,15 +861,12 @@ public class TestAllocationFileLoaderService {
throws Exception { throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("notboth")
out.println("<allocations>"); .parent(true)
out.println("<queue name=\"notboth\" type=\"parent\" >"); .reservation()
out.println("<reservation>"); .build())
out.println("</reservation>"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("</allocations>");
out.close();
AllocationFileLoaderService allocLoader = AllocationFileLoaderService allocLoader =
new AllocationFileLoaderService(scheduler); new AllocationFileLoaderService(scheduler);

View File

@ -24,9 +24,7 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -43,6 +41,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -164,17 +168,14 @@ public class TestAppRunnability extends FairSchedulerTestBase {
} }
@Test @Test
public void testDontAllowUndeclaredPools() throws Exception { public void testDontAllowUndeclaredPools() {
conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("jerry").build())
out.println("<allocations>"); .writeToFile(ALLOC_FILE);
out.println("<queue name=\"jerry\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
// Restarting resource manager since the file location and content is // Restarting resource manager since the file location and content is
// changed. // changed.
resourceManager.stop(); resourceManager.stop();

View File

@ -19,9 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,6 +35,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -58,7 +61,7 @@ public class TestApplicationMasterServiceWithFS {
private static YarnConfiguration configuration; private static YarnConfiguration configuration;
@BeforeClass @BeforeClass
public static void setup() throws IOException { public static void setup() {
String allocFile = String allocFile =
GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath(); GenericTestUtils.getTestDir(TEST_FOLDER).getAbsolutePath();
@ -67,21 +70,15 @@ public class TestApplicationMasterServiceWithFS {
ResourceScheduler.class); ResourceScheduler.class);
configuration.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile); configuration.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
PrintWriter out = new PrintWriter(new FileWriter(allocFile)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("queueA")
out.println("<allocations>"); .maxContainerAllocation("2048 mb 1 vcores")
out.println(" <queue name=\"queueA\">"); .build())
out.println( .addQueue(new AllocationFileQueue.Builder("queueB")
" <maxContainerAllocation>2048 mb 1 vcores</maxContainerAllocation>"); .maxContainerAllocation("3072 mb 1 vcores")
out.println(" </queue>"); .build())
out.println(" <queue name=\"queueB\">"); .addQueue(new AllocationFileQueue.Builder("queueC").build())
out.println( .writeToFile(allocFile);
" <maxContainerAllocation>3072 mb 1 vcores</maxContainerAllocation>");
out.println(" </queue>");
out.println(" <queue name=\"queueC\">");
out.println(" </queue>");
out.println("</allocations>");
out.close();
} }
@AfterClass @AfterClass

View File

@ -22,6 +22,11 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.junit.After; import org.junit.After;
@ -33,12 +38,10 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
/** /**
* Test class to verify identification of app starvation * Test class to verify identification of app starvation.
*/ */
public class TestFSAppStarvation extends FairSchedulerTestBase { public class TestFSAppStarvation extends FairSchedulerTestBase {
@ -186,53 +189,41 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
* 3. Add two nodes to the cluster * 3. Add two nodes to the cluster
* 4. Submit an app that uses up all resources on the cluster * 4. Submit an app that uses up all resources on the cluster
*/ */
private void setupStarvedCluster() throws IOException { private void setupStarvedCluster() {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .drfDefaultQueueSchedulingPolicy()
out.println("<allocations>"); // Default queue
.addQueue(new AllocationFileQueue.Builder("default").build())
// Default queue // Queue with preemption disabled
out.println("<queue name=\"default\">"); .addQueue(new AllocationFileQueue.Builder("no-preemption")
out.println("</queue>"); .fairSharePreemptionThreshold(0).build())
// Queue with minshare preemption enabled
// Queue with preemption disabled .addQueue(new AllocationFileQueue.Builder("minshare")
out.println("<queue name=\"no-preemption\">"); .fairSharePreemptionThreshold(0)
out.println("<fairSharePreemptionThreshold>0" + .minSharePreemptionTimeout(0)
"</fairSharePreemptionThreshold>"); .minResources("2048mb,2vcores")
out.println("</queue>"); .build())
// FAIR queue with fairshare preemption enabled
// Queue with minshare preemption enabled .addQueue(new AllocationFileQueue.Builder("fairshare")
out.println("<queue name=\"minshare\">"); .fairSharePreemptionThreshold(1)
out.println("<fairSharePreemptionThreshold>0" + .fairSharePreemptionTimeout(0)
"</fairSharePreemptionThreshold>"); .schedulingPolicy("fair")
out.println("<minSharePreemptionTimeout>0" + .subQueue(new AllocationFileQueue.Builder("child")
"</minSharePreemptionTimeout>"); .fairSharePreemptionThreshold(1)
out.println("<minResources>2048mb,2vcores</minResources>"); .fairSharePreemptionTimeout(0)
out.println("</queue>"); .schedulingPolicy("fair").build())
.build())
// FAIR queue with fairshare preemption enabled // DRF queue with fairshare preemption enabled
out.println("<queue name=\"fairshare\">"); .addQueue(new AllocationFileQueue.Builder("drf")
out.println("<fairSharePreemptionThreshold>1" + .fairSharePreemptionThreshold(1)
"</fairSharePreemptionThreshold>"); .fairSharePreemptionTimeout(0)
out.println("<fairSharePreemptionTimeout>0" + .schedulingPolicy("drf")
"</fairSharePreemptionTimeout>"); .subQueue(new AllocationFileQueue.Builder("child")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .fairSharePreemptionThreshold(1)
addChildQueue(out, "fair"); .fairSharePreemptionTimeout(0)
out.println("</queue>"); .schedulingPolicy("drf").build())
.build())
// DRF queue with fairshare preemption enabled .writeToFile(ALLOC_FILE.getAbsolutePath());
out.println("<queue name=\"drf\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>drf</schedulingPolicy>");
addChildQueue(out, "drf");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
assertTrue("Allocation file does not exist, not running the test", assertTrue("Allocation file does not exist, not running the test",
ALLOC_FILE.exists()); ALLOC_FILE.exists());
@ -258,17 +249,6 @@ public class TestFSAppStarvation extends FairSchedulerTestBase {
assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size()); assertEquals(8, scheduler.getSchedulerApp(app).getLiveContainers().size());
} }
private void addChildQueue(PrintWriter out, String policy) {
// Child queue under fairshare with same settings
out.println("<queue name=\"child\">");
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
out.println("<schedulingPolicy>" + policy + "</schedulingPolicy>");
out.println("</queue>");
}
private void submitAppsToEachLeafQueue() { private void submitAppsToEachLeafQueue() {
for (String queue : QUEUES) { for (String queue : QUEUES) {
createSchedulingRequest(1024, 1, "root." + queue, "user", 1); createSchedulingRequest(1024, 1, "root." + queue, "user", 1);

View File

@ -24,9 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -49,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -110,17 +110,14 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
} }
@Test (timeout = 5000) @Test (timeout = 5000)
public void test() throws Exception { public void test() {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); AllocationFileWriter.create()
out.println("<allocations>"); .queueMaxAMShareDefault(MAX_AM_SHARE)
out.println("<queueMaxAMShareDefault>" + MAX_AM_SHARE + .addQueue(new AllocationFileQueue.Builder("queueA").build())
"</queueMaxAMShareDefault>"); .addQueue(new AllocationFileQueue.Builder("queueB").build())
out.println("<queue name=\"queueA\"></queue>"); .writeToFile(ALLOC_FILE);
out.println("<queue name=\"queueB\"></queue>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf); resourceManager = new MockRM(conf);
resourceManager.start(); resourceManager.start();

View File

@ -21,9 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection; import java.util.Collection;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -33,6 +31,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -57,34 +60,33 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
conf = null; conf = null;
} }
private void createClusterWithQueuesAndOneNode(int mem, String policy) private void createClusterWithQueuesAndOneNode(int mem) {
throws IOException { createClusterWithQueuesAndOneNode(mem, 0, "fair");
createClusterWithQueuesAndOneNode(mem, 0, policy);
} }
private void createClusterWithQueuesAndOneNode(int mem, int vCores, private void createClusterWithQueuesAndOneNode(int mem, int vCores,
String policy) throws IOException { String policy) {
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter allocationFileWriter = AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("parentA")
out.println("<queue name=\"root\" >"); .weight(8)
out.println(" <queue name=\"parentA\" >"); .subQueue(new AllocationFileQueue.Builder("childA1").build())
out.println(" <weight>8</weight>"); .subQueue(new AllocationFileQueue.Builder("childA2").build())
out.println(" <queue name=\"childA1\" />"); .subQueue(new AllocationFileQueue.Builder("childA3").build())
out.println(" <queue name=\"childA2\" />"); .subQueue(new AllocationFileQueue.Builder("childA4").build())
out.println(" <queue name=\"childA3\" />"); .build())
out.println(" <queue name=\"childA4\" />"); .subQueue(new AllocationFileQueue.Builder("parentB")
out.println(" </queue>"); .weight(1)
out.println(" <queue name=\"parentB\" >"); .subQueue(new AllocationFileQueue.Builder("childB1").build())
out.println(" <weight>1</weight>"); .subQueue(new AllocationFileQueue.Builder("childB2").build())
out.println(" <queue name=\"childB1\" />"); .build())
out.println(" <queue name=\"childB2\" />"); .build());
out.println(" </queue>"); if (policy.equals("fair")) {
out.println("</queue>"); allocationFileWriter.fairDefaultQueueSchedulingPolicy();
out.println("<defaultQueueSchedulingPolicy>" + policy } else if (policy.equals("drf")) {
+ "</defaultQueueSchedulingPolicy>"); allocationFileWriter.drfDefaultQueueSchedulingPolicy();
out.println("</allocations>"); }
out.close(); allocationFileWriter.writeToFile(ALLOC_FILE);
resourceManager = new MockRM(conf); resourceManager = new MockRM(conf);
resourceManager.start(); resourceManager.start();
@ -97,9 +99,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
} }
@Test @Test
public void testFairShareNoAppsRunning() throws IOException { public void testFairShareNoAppsRunning() {
int nodeCapacity = 16 * 1024; int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); createClusterWithQueuesAndOneNode(nodeCapacity);
scheduler.update(); scheduler.update();
// No apps are running in the cluster,verify if fair share is zero // No apps are running in the cluster,verify if fair share is zero
@ -121,9 +123,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
} }
@Test @Test
public void testFairShareOneAppRunning() throws IOException { public void testFairShareOneAppRunning() {
int nodeCapacity = 16 * 1024; int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); createClusterWithQueuesAndOneNode(nodeCapacity);
// Run a app in a childA1. Verify whether fair share is 100% in childA1, // Run a app in a childA1. Verify whether fair share is 100% in childA1,
// since it is the only active queue. // since it is the only active queue.
@ -149,10 +151,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
} }
@Test @Test
public void testFairShareMultipleActiveQueuesUnderSameParent() public void testFairShareMultipleActiveQueuesUnderSameParent() {
throws IOException {
int nodeCapacity = 16 * 1024; int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); createClusterWithQueuesAndOneNode(nodeCapacity);
// Run apps in childA1,childA2,childA3 // Run apps in childA1,childA2,childA3
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
@ -179,7 +180,7 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
public void testFairShareMultipleActiveQueuesUnderDifferentParent() public void testFairShareMultipleActiveQueuesUnderDifferentParent()
throws IOException { throws IOException {
int nodeCapacity = 16 * 1024; int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); createClusterWithQueuesAndOneNode(nodeCapacity);
// Run apps in childA1,childA2 which are under parentA // Run apps in childA1,childA2 which are under parentA
createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1"); createSchedulingRequest(2 * 1024, "root.parentA.childA1", "user1");
@ -218,9 +219,9 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
} }
@Test @Test
public void testFairShareResetsToZeroWhenAppsComplete() throws IOException { public void testFairShareResetsToZeroWhenAppsComplete() {
int nodeCapacity = 16 * 1024; int nodeCapacity = 16 * 1024;
createClusterWithQueuesAndOneNode(nodeCapacity, "fair"); createClusterWithQueuesAndOneNode(nodeCapacity);
// Run apps in childA1,childA2 which are under parentA // Run apps in childA1,childA2 which are under parentA
ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024, ApplicationAttemptId app1 = createSchedulingRequest(2 * 1024,
@ -268,8 +269,7 @@ public class TestFairSchedulerFairShare extends FairSchedulerTestBase {
} }
@Test @Test
public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() {
throws IOException {
int nodeMem = 16 * 1024; int nodeMem = 16 * 1024;
int nodeVCores = 10; int nodeVCores = 10;
createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf"); createClusterWithQueuesAndOneNode(nodeMem, nodeVCores, "drf");

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.After; import org.junit.After;
@ -38,9 +40,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
@ -107,7 +107,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
} }
} }
private void writeAllocFile() throws IOException { private void writeAllocFile() {
/* /*
* Queue hierarchy: * Queue hierarchy:
* root * root
@ -115,80 +115,73 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
* |--- child-1 * |--- child-1
* |--- child-2 * |--- child-2
* |--- preemptable-sibling * |--- preemptable-sibling
* |--- nonpreemptible * |--- nonpreemptable
* |--- child-1 * |--- child-1
* |--- child-2 * |--- child-2
*/ */
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter allocationFileWriter;
out.println("<?xml version=\"1.0\"?>"); if (fairsharePreemption) {
out.println("<allocations>"); allocationFileWriter = AllocationFileWriter.create()
.addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"preemptable\">"); .subQueue(new AllocationFileQueue.Builder("preemptable")
writePreemptionParams(out); .fairSharePreemptionThreshold(1)
.fairSharePreemptionTimeout(0)
// Child-1 .subQueue(new AllocationFileQueue.Builder("child-1")
out.println("<queue name=\"child-1\">"); .build())
writeResourceParams(out); .subQueue(new AllocationFileQueue.Builder("child-2")
out.println("</queue>"); .build())
.build())
// Child-2 .subQueue(new AllocationFileQueue.Builder("preemptable-sibling")
out.println("<queue name=\"child-2\">"); .fairSharePreemptionThreshold(1)
writeResourceParams(out); .fairSharePreemptionTimeout(0)
out.println("</queue>"); .build())
.subQueue(new AllocationFileQueue.Builder("nonpreemptable")
out.println("</queue>"); // end of preemptable queue .allowPreemptionFrom(false)
.fairSharePreemptionThreshold(1)
out.println("<queue name=\"preemptable-sibling\">"); .fairSharePreemptionTimeout(0)
writePreemptionParams(out); .subQueue(new AllocationFileQueue.Builder("child-1")
out.println("</queue>"); .build())
.subQueue(new AllocationFileQueue.Builder("child-2")
// Queue with preemption disallowed .build())
out.println("<queue name=\"nonpreemptable\">"); .build())
out.println("<allowPreemptionFrom>false" + .build());
"</allowPreemptionFrom>"); } else {
writePreemptionParams(out); allocationFileWriter = AllocationFileWriter.create()
.addQueue(new AllocationFileQueue.Builder("root")
// Child-1 .subQueue(new AllocationFileQueue.Builder("preemptable")
out.println("<queue name=\"child-1\">"); .minSharePreemptionTimeout(0)
writeResourceParams(out); .subQueue(new AllocationFileQueue.Builder("child-1")
out.println("</queue>"); .minResources("4096mb,4vcores")
.build())
// Child-2 .subQueue(new AllocationFileQueue.Builder("child-2")
out.println("<queue name=\"child-2\">"); .minResources("4096mb,4vcores")
writeResourceParams(out); .build())
out.println("</queue>"); .build())
.subQueue(new AllocationFileQueue.Builder("preemptable-sibling")
out.println("</queue>"); // end of nonpreemptable queue .minSharePreemptionTimeout(0)
.build())
.subQueue(new AllocationFileQueue.Builder("nonpreemptable")
.allowPreemptionFrom(false)
.minSharePreemptionTimeout(0)
.subQueue(new AllocationFileQueue.Builder("child-1")
.minResources("4096mb,4vcores")
.build())
.subQueue(new AllocationFileQueue.Builder("child-2")
.minResources("4096mb,4vcores")
.build())
.build())
.build());
}
if (drf) { if (drf) {
out.println("<defaultQueueSchedulingPolicy>drf" + allocationFileWriter.drfDefaultQueueSchedulingPolicy();
"</defaultQueueSchedulingPolicy>");
} }
out.println("</allocations>"); allocationFileWriter.writeToFile(ALLOC_FILE.getAbsolutePath());
out.close();
assertTrue("Allocation file does not exist, not running the test", assertTrue("Allocation file does not exist, not running the test",
ALLOC_FILE.exists()); ALLOC_FILE.exists());
} }
private void writePreemptionParams(PrintWriter out) {
if (fairsharePreemption) {
out.println("<fairSharePreemptionThreshold>1" +
"</fairSharePreemptionThreshold>");
out.println("<fairSharePreemptionTimeout>0" +
"</fairSharePreemptionTimeout>");
} else {
out.println("<minSharePreemptionTimeout>0" +
"</minSharePreemptionTimeout>");
}
}
private void writeResourceParams(PrintWriter out) {
if (!fairsharePreemption) {
out.println("<minResources>4096mb,4vcores</minResources>");
}
}
private void setupCluster() throws IOException { private void setupCluster() throws IOException {
resourceManager = new MockRM(conf); resourceManager = new MockRM(conf);
scheduler = (FairScheduler) resourceManager.getResourceScheduler(); scheduler = (FairScheduler) resourceManager.getResourceScheduler();

View File

@ -18,41 +18,39 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase; import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
public class TestFairSchedulerQueueACLs extends QueueACLsTestBase { public class TestFairSchedulerQueueACLs extends QueueACLsTestBase {
@Override @Override
protected Configuration createConfiguration() throws IOException { protected Configuration createConfiguration() {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration(); FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
final String TEST_DIR = new File(System.getProperty("test.build.data", final String testDir = new File(System.getProperty("test.build.data",
"/tmp")).getAbsolutePath(); "/tmp")).getAbsolutePath();
final String ALLOC_FILE = new File(TEST_DIR, "test-queues.xml") final String allocFile = new File(testDir, "test-queues.xml")
.getAbsolutePath(); .getAbsolutePath();
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); AllocationFileWriter.create()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"root\">"); .aclSubmitApps(" ")
out.println(" <aclSubmitApps> </aclSubmitApps>"); .aclAdministerApps("root_admin ")
out.println(" <aclAdministerApps>root_admin </aclAdministerApps>"); .subQueue(new AllocationFileQueue.Builder("queueA")
out.println(" <queue name=\"queueA\">"); .aclSubmitApps("queueA_user,common_user ")
out.println(" <aclSubmitApps>queueA_user,common_user </aclSubmitApps>"); .aclAdministerApps("queueA_admin ").build())
out.println(" <aclAdministerApps>queueA_admin </aclAdministerApps>"); .subQueue(new AllocationFileQueue.Builder("queueB")
out.println(" </queue>"); .aclSubmitApps("queueB_user,common_user ")
out.println(" <queue name=\"queueB\">"); .aclAdministerApps("queueB_admin ").build())
out.println(" <aclSubmitApps>queueB_user,common_user </aclSubmitApps>"); .build())
out.println(" <aclAdministerApps>queueB_admin </aclAdministerApps>"); .writeToFile(allocFile);
out.println(" </queue>");
out.println("</queue>"); fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, allocFile);
out.println("</allocations>");
out.close();
fsConf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); fsConf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
fsConf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); fsConf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());

View File

@ -18,14 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -38,7 +40,7 @@ public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
createConfiguration(); createConfiguration();
writeAllocFile(30, 40); writeAllocFile(30);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
ALLOC_FILE.getAbsolutePath()); ALLOC_FILE.getAbsolutePath());
@ -56,32 +58,20 @@ public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
} }
} }
private void writeAllocFile(int defaultFairShareTimeout, private void writeAllocFile(int defaultFairShareTimeout) {
int fairShareTimeout) throws IOException { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); .addQueue(new AllocationFileQueue.Builder("default")
out.println("<?xml version=\"1.0\"?>"); .build())
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("queueA").build())
out.println("<queue name=\"default\">"); .addQueue(new AllocationFileQueue.Builder("queueB")
out.println("</queue>"); .subQueue(new AllocationFileQueue.Builder("queueB1")
out.println("<queue name=\"queueA\">"); .minSharePreemptionTimeout(5).build())
out.println("</queue>"); .subQueue(new AllocationFileQueue.Builder("queueB2").build())
out.println("<queue name=\"queueB\">"); .build())
out.println("<queue name=\"queueB1\">"); .addQueue(new AllocationFileQueue.Builder("queueC").build())
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>"); .defaultMinSharePreemptionTimeout(15)
out.println("</queue>"); .defaultFairSharePreemptionTimeout(defaultFairShareTimeout)
out.println("<queue name=\"queueB2\">"); .writeToFile(ALLOC_FILE.getAbsolutePath());
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.println("<defaultMinSharePreemptionTimeout>15"
+ "</defaultMinSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionTimeout>" +
+ defaultFairShareTimeout + "</defaultFairSharePreemptionTimeout>");
out.println("<fairSharePreemptionTimeout>"
+ fairShareTimeout + "</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
} }
@Test @Test
@ -120,7 +110,7 @@ public class TestQueueManagerRealScheduler extends FairSchedulerTestBase {
// Lower the fairshare preemption timeouts and verify it is picked // Lower the fairshare preemption timeouts and verify it is picked
// correctly. // correctly.
writeAllocFile(25, 30); writeAllocFile(25);
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(25000, queueMgr.getQueue("root") assertEquals(25000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout()); .getFairSharePreemptionTimeout());

View File

@ -19,9 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.Stack; import java.util.Stack;
@ -32,6 +30,10 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -319,22 +321,16 @@ public class TestSchedulingPolicy {
@Test @Test
public void testSchedulingPolicyViolation() throws IOException { public void testSchedulingPolicyViolation() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .drfDefaultQueueSchedulingPolicy()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"root\">"); .schedulingPolicy("fair")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .subQueue(new AllocationFileQueue.Builder("child1")
out.println(" <queue name=\"child1\">"); .schedulingPolicy("drf").build())
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .subQueue(new AllocationFileQueue.Builder("child2")
out.println(" </queue>"); .schedulingPolicy("fair").build())
out.println(" <queue name=\"child2\">"); .build())
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .writeToFile(ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
scheduler.init(conf); scheduler.init(conf);
@ -349,22 +345,16 @@ public class TestSchedulingPolicy {
+ " policy if its parent policy is 'fair'.", dynamicQueue); + " policy if its parent policy is 'fair'.", dynamicQueue);
// Set child1 to 'fair' and child2 to 'drf', the reload the allocation file. // Set child1 to 'fair' and child2 to 'drf', the reload the allocation file.
out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .drfDefaultQueueSchedulingPolicy()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"root\">"); .schedulingPolicy("fair")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .subQueue(new AllocationFileQueue.Builder("child1")
out.println(" <queue name=\"child1\">"); .schedulingPolicy("fair").build())
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .subQueue(new AllocationFileQueue.Builder("child2")
out.println(" </queue>"); .schedulingPolicy("drf").build())
out.println(" <queue name=\"child2\">"); .build())
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .writeToFile(ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, null); scheduler.reinitialize(conf, null);
child1 = scheduler.getQueueManager().getQueue("child1"); child1 = scheduler.getQueueManager().getQueue("child1");
@ -379,26 +369,21 @@ public class TestSchedulingPolicy {
} }
@Test @Test
public void testSchedulingPolicyViolationInTheMiddleLevel() public void testSchedulingPolicyViolationInTheMiddleLevel() {
throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .schedulingPolicy("fair")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("level2")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("fair")
out.println(" <queue name=\"level2\">"); .subQueue(new AllocationFileQueue.Builder("level3")
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("drf")
out.println(" <queue name=\"level3\">"); .subQueue(new AllocationFileQueue.Builder("leaf")
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .schedulingPolicy("fair").build())
out.println(" <queue name=\"leaf\">"); .build())
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .build())
out.println(" </queue>"); .build())
out.println(" </queue>"); .writeToFile(ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf); scheduler.init(conf);
@ -417,19 +402,16 @@ public class TestSchedulingPolicy {
public void testFIFOPolicyOnlyForLeafQueues() public void testFIFOPolicyOnlyForLeafQueues()
throws IOException { throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); AllocationFileWriter.create()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("intermediate")
out.println(" <queue name=\"intermediate\">"); .schedulingPolicy("fifo")
out.println(" <schedulingPolicy>fifo</schedulingPolicy>"); .subQueue(new AllocationFileQueue.Builder("leaf")
out.println(" <queue name=\"leaf\">"); .schedulingPolicy("fair").build())
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .build())
out.println(" </queue>"); .build())
out.println(" </queue>"); .writeToFile(ALLOC_FILE);
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf); scheduler.init(conf);
@ -437,19 +419,15 @@ public class TestSchedulingPolicy {
assertNull("Queue 'intermediate' should be null since 'fifo' is only for " assertNull("Queue 'intermediate' should be null since 'fifo' is only for "
+ "leaf queue.", intermediate); + "leaf queue.", intermediate);
out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("intermediate")
out.println("<queue name=\"root\">"); .schedulingPolicy("fair")
out.println(" <queue name=\"intermediate\">"); .subQueue(new AllocationFileQueue.Builder("leaf")
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("fifo").build())
out.println(" <queue name=\"leaf\">"); .build())
out.println(" <schedulingPolicy>fifo</schedulingPolicy>"); .build())
out.println(" </queue>"); .writeToFile(ALLOC_FILE);
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, null); scheduler.reinitialize(conf, null);
@ -461,41 +439,30 @@ public class TestSchedulingPolicy {
} }
@Test @Test
public void testPolicyReinitilization() throws IOException { public void testPolicyReinitialization() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .schedulingPolicy("fair")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("child1")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("fair").build())
out.println(" <queue name=\"child1\">"); .subQueue(new AllocationFileQueue.Builder("child2")
out.println(" <schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("fair").build())
out.println(" </queue>"); .build())
out.println(" <queue name=\"child2\">"); .writeToFile(ALLOC_FILE);
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf); scheduler.init(conf);
// Set child1 to 'drf' which is not allowed, then reload the allocation file // Set child1 to 'drf' which is not allowed, then reload the allocation file
out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .schedulingPolicy("fair")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("child1")
out.println("<schedulingPolicy>fair</schedulingPolicy>"); .schedulingPolicy("drf").build())
out.println(" <queue name=\"child1\">"); .subQueue(new AllocationFileQueue.Builder("child2")
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .schedulingPolicy("fifo").build())
out.println(" </queue>"); .build())
out.println(" <queue name=\"child2\">"); .writeToFile(ALLOC_FILE);
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, null); scheduler.reinitialize(conf, null);
FSQueue child1 = scheduler.getQueueManager().getQueue("child1"); FSQueue child1 = scheduler.getQueueManager().getQueue("child1");
@ -508,20 +475,15 @@ public class TestSchedulingPolicy {
child2.getPolicy() instanceof FairSharePolicy); child2.getPolicy() instanceof FairSharePolicy);
// Set both child1 and root to 'drf', then reload the allocation file // Set both child1 and root to 'drf', then reload the allocation file
out = new PrintWriter(new FileWriter(ALLOC_FILE)); AllocationFileWriter.create()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .schedulingPolicy("drf")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("child1")
out.println("<schedulingPolicy>drf</schedulingPolicy>"); .schedulingPolicy("drf").build())
out.println(" <queue name=\"child1\">"); .subQueue(new AllocationFileQueue.Builder("child2")
out.println(" <schedulingPolicy>drf</schedulingPolicy>"); .schedulingPolicy("fifo").build())
out.println(" </queue>"); .build())
out.println(" <queue name=\"child2\">"); .writeToFile(ALLOC_FILE);
out.println(" <schedulingPolicy>fifo</schedulingPolicy>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, null); scheduler.reinitialize(conf, null);

View File

@ -16,69 +16,261 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import com.google.common.collect.Lists;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.List; import java.util.List;
class AllocationFileQueue { import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
private final AllocationFileQueueProperties properties; .allocationfile.AllocationFileWriter.addIfPresent;
private final List<AllocationFileQueue> subQueues;
AllocationFileQueue(AllocationFileQueueProperties properties, /**
List<AllocationFileQueue> subQueues) { * DAO for Allocation File Queue.
this.properties = properties; */
this.subQueues = subQueues; final public class AllocationFileQueue {
private static final String DEFAULT_TAG_NAME = "queue";
private static final String LEGACY_TAG_NAME = "pool";
private final String queueName;
private final String minResources;
private final String maxResources;
private final String aclAdministerApps;
private final String aclSubmitApps;
private final String aclSubmitReservations;
private final String aclAdministerReservations;
private final String aclListReservations;
private final String schedulingPolicy;
private final Integer maxRunningApps;
private final Double maxAMShare;
private final Boolean allowPreemptionFrom;
private final Integer minSharePreemptionTimeout;
private final String maxChildResources;
private final Integer fairSharePreemptionTimeout;
private final Double fairSharePreemptionThreshold;
private final String maxContainerAllocation;
private final List<AllocationFileQueue> subQueues;
private final Float weight;
private String tagName;
private final boolean parent;
private final boolean reservation;
private AllocationFileQueue(Builder builder) {
this.queueName = builder.name;
this.parent = builder.parent;
this.minResources = builder.minResources;
this.maxResources = builder.maxResources;
this.aclAdministerApps = builder.aclAdministerApps;
this.aclSubmitApps = builder.aclSubmitApps;
this.aclSubmitReservations = builder.aclSubmitReservations;
this.aclAdministerReservations = builder.aclAdministerReservations;
this.aclListReservations = builder.aclListReservations;
this.schedulingPolicy = builder.schedulingPolicy;
this.maxRunningApps = builder.maxRunningApps;
this.maxAMShare = builder.maxAMShare;
this.allowPreemptionFrom = builder.allowPreemptionFrom;
this.minSharePreemptionTimeout = builder.minSharePreemptionTimeout;
this.maxChildResources = builder.maxChildResources;
this.fairSharePreemptionTimeout = builder.fairSharePreemptionTimeout;
this.fairSharePreemptionThreshold = builder.fairSharePreemptionThreshold;
this.maxContainerAllocation = builder.maxContainerAllocation;
this.weight = builder.weight;
this.reservation = builder.reservation;
this.subQueues = builder.subQueues;
this.tagName = DEFAULT_TAG_NAME;
} }
String render() { String render() {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
printStartTag(pw); printStartTag(pw);
AllocationFileWriter.printQueues(pw, subQueues); AllocationFileWriter.printQueues(pw, subQueues,
AllocationFileWriter.addIfPresent(pw, "minResources", tagName.equals(LEGACY_TAG_NAME));
properties::getMinResources); addIfPresent(pw, "minResources", minResources);
AllocationFileWriter.addIfPresent(pw, "maxResources", addIfPresent(pw, "maxResources", maxResources);
properties::getMaxResources); addIfPresent(pw, "aclAdministerApps", aclAdministerApps);
AllocationFileWriter.addIfPresent(pw, "aclAdministerApps", addIfPresent(pw, "aclSubmitApps", aclSubmitApps);
properties::getAclAdministerApps); addIfPresent(pw, "aclSubmitReservations", aclSubmitReservations);
AllocationFileWriter.addIfPresent(pw, "aclSubmitApps", addIfPresent(pw, "aclAdministerReservations", aclAdministerReservations);
properties::getAclSubmitApps); addIfPresent(pw, "aclListReservations", aclListReservations);
AllocationFileWriter.addIfPresent(pw, "schedulingPolicy", addIfPresent(pw, "schedulingPolicy", schedulingPolicy);
properties::getSchedulingPolicy); addIfPresent(pw, "maxRunningApps", maxRunningApps);
AllocationFileWriter.addIfPresent(pw, "maxRunningApps", addIfPresent(pw, "maxAMShare", maxAMShare);
() -> AllocationFileWriter addIfPresent(pw, "allowPreemptionFrom", allowPreemptionFrom);
.createNumberSupplier(properties.getMaxRunningApps())); addIfPresent(pw, "minSharePreemptionTimeout", minSharePreemptionTimeout);
AllocationFileWriter.addIfPresent(pw, "maxAMShare", addIfPresent(pw, "maxChildResources", maxChildResources);
() -> AllocationFileWriter.createNumberSupplier(properties addIfPresent(pw, "fairSharePreemptionTimeout", fairSharePreemptionTimeout);
.getMaxAMShare())); addIfPresent(pw, "fairSharePreemptionThreshold",
AllocationFileWriter.addIfPresent(pw, "minSharePreemptionTimeout", fairSharePreemptionThreshold);
() -> AllocationFileWriter addIfPresent(pw, "maxContainerAllocation", maxContainerAllocation);
.createNumberSupplier(properties.getMinSharePreemptionTimeout())); addIfPresent(pw, "weight", weight);
AllocationFileWriter.addIfPresent(pw, "maxChildResources", if (reservation) {
properties::getMaxChildResources); pw.println("<reservation></reservation>");
AllocationFileWriter.addIfPresent(pw, "fairSharePreemptionTimeout", }
() -> AllocationFileWriter
.createNumberSupplier(properties.getFairSharePreemptionTimeout()));
AllocationFileWriter.addIfPresent(pw, "fairSharePreemptionThreshold",
() -> AllocationFileWriter.createNumberSupplier(
properties.getFairSharePreemptionThreshold()));
AllocationFileWriter.addIfPresent(pw, "maxContainerAllocation",
() -> AllocationFileWriter
.createNumberSupplier(properties.getMaxContainerAllocation()));
printEndTag(pw); printEndTag(pw);
pw.close(); pw.close();
return sw.toString(); return sw.toString();
} }
String renderWithLegacyTag() {
this.tagName = LEGACY_TAG_NAME;
return render();
}
private void printStartTag(PrintWriter pw) { private void printStartTag(PrintWriter pw) {
pw.print("<queue name=\"" + properties.getQueueName() + "\" "); String queueWithName = String.format("<%s name=\"%s\"", tagName, queueName);
if (properties.getParent()) { pw.print(queueWithName);
pw.print("type=\"parent\""); if (parent) {
pw.print(" type=\"parent\"");
} }
pw.println(">"); pw.println(">");
} }
private void printEndTag(PrintWriter pw) { private void printEndTag(PrintWriter pw) {
pw.println("</queue>"); pw.println("</" + tagName + ">");
}
/**
* Class that can build queues (with subqueues) for testcases.
* The intention of having this class to group the common properties of
* simple queues and subqueues by methods delegating calls to a
* queuePropertiesBuilder instance.
*/
public static class Builder {
private String name;
private String minResources;
private String maxResources;
private String aclAdministerApps;
private String aclSubmitApps;
private String aclSubmitReservations;
private String aclAdministerReservations;
private String aclListReservations;
private String schedulingPolicy;
private Integer maxRunningApps;
private Double maxAMShare;
private Boolean allowPreemptionFrom;
private Integer minSharePreemptionTimeout;
private boolean parent;
private String maxChildResources;
private Integer fairSharePreemptionTimeout;
private Double fairSharePreemptionThreshold;
private String maxContainerAllocation;
private boolean reservation;
private final List<AllocationFileQueue> subQueues = Lists.newArrayList();
private Float weight;
public Builder(String name) {
this.name = name;
}
public Builder parent(boolean value) {
this.parent = value;
return this;
}
public Builder minResources(String value) {
this.minResources = value;
return this;
}
public Builder maxResources(String value) {
this.maxResources = value;
return this;
}
public Builder aclAdministerApps(String value) {
this.aclAdministerApps = value;
return this;
}
public Builder aclSubmitApps(String value) {
this.aclSubmitApps = value;
return this;
}
public Builder aclSubmitReservations(String value) {
this.aclSubmitReservations = value;
return this;
}
public Builder aclAdministerReservations(String value) {
this.aclAdministerReservations = value;
return this;
}
public Builder aclListReservations(String value) {
this.aclListReservations = value;
return this;
}
public Builder schedulingPolicy(String value) {
this.schedulingPolicy = value;
return this;
}
public Builder maxRunningApps(int value) {
this.maxRunningApps = value;
return this;
}
public Builder maxAMShare(double value) {
this.maxAMShare = value;
return this;
}
public Builder allowPreemptionFrom(boolean value) {
this.allowPreemptionFrom = value;
return this;
}
public Builder minSharePreemptionTimeout(int value) {
this.minSharePreemptionTimeout = value;
return this;
}
public Builder maxChildResources(String value) {
this.maxChildResources = value;
return this;
}
public Builder fairSharePreemptionTimeout(Integer value) {
this.fairSharePreemptionTimeout = value;
return this;
}
public Builder fairSharePreemptionThreshold(
double value) {
this.fairSharePreemptionThreshold = value;
return this;
}
public Builder maxContainerAllocation(String value) {
this.maxContainerAllocation = value;
return this;
}
public Builder weight(float value) {
this.weight = value;
return this;
}
public Builder reservation() {
this.reservation = true;
return this;
}
public Builder subQueue(AllocationFileQueue queue) {
if (queue == null) {
throw new IllegalArgumentException("Subqueue cannot be null!");
}
subQueues.add(queue);
return this;
}
public AllocationFileQueue build() {
return new AllocationFileQueue(this);
}
} }
} }

View File

@ -1,121 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
/**
* Abstract base class for building simple queues and subqueues for testcases.
* Currently there are two concrete types subclassed from this class:
* {@link AllocationFileSimpleQueueBuilder} and
* {@link AllocationFileSubQueueBuilder}.
* The intention of having this class to group the common properties of
* simple queues and subqueues by methods delegating calls to a
* queuePropertiesBuilder instance.
*/
public abstract class AllocationFileQueueBuilder {
final AllocationFileQueueProperties.Builder queuePropertiesBuilder;
AllocationFileQueueBuilder() {
this.queuePropertiesBuilder =
AllocationFileQueueProperties.Builder.create();
}
public AllocationFileQueueBuilder parent(boolean parent) {
this.queuePropertiesBuilder.parent(parent);
return this;
}
public AllocationFileQueueBuilder minResources(String value) {
this.queuePropertiesBuilder.minResources(value);
return this;
}
public AllocationFileQueueBuilder maxResources(String value) {
this.queuePropertiesBuilder.maxResources(value);
return this;
}
public AllocationFileQueueBuilder aclAdministerApps(String value) {
this.queuePropertiesBuilder.aclAdministerApps(value);
return this;
}
public AllocationFileQueueBuilder aclSubmitApps(String value) {
this.queuePropertiesBuilder.aclSubmitApps(value);
return this;
}
public AllocationFileQueueBuilder schedulingPolicy(String value) {
this.queuePropertiesBuilder.schedulingPolicy(value);
return this;
}
public AllocationFileQueueBuilder maxRunningApps(int value) {
this.queuePropertiesBuilder.maxRunningApps(value);
return this;
}
public AllocationFileQueueBuilder maxAMShare(double value) {
this.queuePropertiesBuilder.maxAMShare(value);
return this;
}
public AllocationFileQueueBuilder minSharePreemptionTimeout(int value) {
this.queuePropertiesBuilder.minSharePreemptionTimeout(value);
return this;
}
public AllocationFileQueueBuilder maxChildResources(String value) {
this.queuePropertiesBuilder.maxChildResources(value);
return this;
}
public AllocationFileQueueBuilder fairSharePreemptionTimeout(Integer value) {
this.queuePropertiesBuilder.fairSharePreemptionTimeout(value);
return this;
}
public AllocationFileQueueBuilder fairSharePreemptionThreshold(
double value) {
this.queuePropertiesBuilder.fairSharePreemptionThreshold(value);
return this;
}
public AllocationFileQueueBuilder maxContainerAllocation(
String maxContainerAllocation) {
this.queuePropertiesBuilder.maxContainerAllocation(maxContainerAllocation);
return this;
}
public AllocationFileQueueBuilder subQueue(String queueName) {
if (this instanceof AllocationFileSimpleQueueBuilder) {
return new AllocationFileSubQueueBuilder(
(AllocationFileSimpleQueueBuilder) this, queueName);
} else {
throw new IllegalStateException(
"subQueue can only be invoked on instances of "
+ AllocationFileSimpleQueueBuilder.class);
}
}
public abstract AllocationFileWriter buildQueue();
public abstract AllocationFileSimpleQueueBuilder buildSubQueue();
AllocationFileQueueProperties.Builder getqueuePropertiesBuilder() {
return queuePropertiesBuilder;
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import com.google.common.collect.Lists;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
/**
* Helper class to manage {@link AllocationFileQueuePlacementRule}
* instances for {@link AllocationFileWriter}.
*/
public class AllocationFileQueuePlacementPolicy {
private List<AllocationFileQueuePlacementRule> rules = Lists.newArrayList();
public AllocationFileQueuePlacementPolicy addRule(
AllocationFileQueuePlacementRule rule) {
this.rules.add(rule);
return this;
}
public String render() {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
addStartTag(pw);
addRules(pw);
addEndTag(pw);
pw.close();
return sw.toString();
}
private void addStartTag(PrintWriter pw) {
pw.println("<queuePlacementPolicy>");
}
private void addRules(PrintWriter pw) {
for (AllocationFileQueuePlacementRule rule : rules) {
pw.println(rule.render());
}
}
private void addEndTag(PrintWriter pw) {
pw.println("</queuePlacementPolicy>");
}
}

View File

@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import com.google.common.collect.Lists;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.List;
/**
* Helper class for {@link AllocationFileWriter} to manage
* queue placement rules.
*/
public class AllocationFileQueuePlacementRule {
public enum RuleName {
DEFAULT("default"),
SPECIFIED("specified"),
REJECT("reject"),
NESTED("nestedUserQueue"),
PRIMARY_GROUP("primaryGroup");
private String name;
RuleName(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
private RuleName name;
private boolean create = true;
private String queue;
private List<AllocationFileQueuePlacementRule> nestedRules = Lists
.newArrayList();
public AllocationFileQueuePlacementRule(RuleName name) {
this.name = name;
}
public AllocationFileQueuePlacementRule create(boolean shouldCreate) {
this.create = shouldCreate;
return this;
}
public AllocationFileQueuePlacementRule queue(String selectedQueue) {
this.queue = selectedQueue;
return this;
}
public AllocationFileQueuePlacementRule addNestedRule(
AllocationFileQueuePlacementRule rule) {
this.nestedRules.add(rule);
return this;
}
public String render() {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
addStartTag(pw);
addNestedRules(pw);
addEndTag(pw);
pw.close();
return sw.toString();
}
private void addStartTag(PrintWriter pw) {
pw.print("<rule name=\"" + name.toString() + "\" create=\"" +
String.valueOf(create) + "\"");
if (queue != null) {
pw.print("queue=\"" + queue + "\"");
}
pw.println(">");
}
private void addNestedRules(PrintWriter pw) {
if (nestedRules != null && !nestedRules.isEmpty()) {
for (AllocationFileQueuePlacementRule rule : nestedRules) {
pw.println(rule.render());
}
}
}
private void addEndTag(PrintWriter pw) {
pw.println("</rule>");
}
}

View File

@ -1,214 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
/**
* The purpose of this class is to store all properties of a queue.
*/
public class AllocationFileQueueProperties {
private final String queueName;
private final String minResources;
private final String maxResources;
private final String aclAdministerApps;
private final String aclSubmitApps;
private final String schedulingPolicy;
private final Integer maxRunningApps;
private final Double maxAMShare;
private final Integer minSharePreemptionTimeout;
private final Boolean parent;
private final String maxChildResources;
private final Integer fairSharePreemptionTimeout;
private final Double fairSharePreemptionThreshold;
private final String maxContainerAllocation;
AllocationFileQueueProperties(Builder builder) {
this.queueName = builder.queueName;
this.parent = builder.parent;
this.minResources = builder.minResources;
this.maxResources = builder.maxResources;
this.aclAdministerApps = builder.aclAdministerApps;
this.aclSubmitApps = builder.aclSubmitApps;
this.schedulingPolicy = builder.schedulingPolicy;
this.maxRunningApps = builder.maxRunningApps;
this.maxAMShare = builder.maxAMShare;
this.minSharePreemptionTimeout = builder.minSharePreemptionTimeout;
this.maxChildResources = builder.maxChildResources;
this.fairSharePreemptionTimeout = builder.fairSharePreemptionTimeout;
this.fairSharePreemptionThreshold = builder.fairSharePreemptionThreshold;
this.maxContainerAllocation = builder.maxContainerAllocation;
}
public String getQueueName() {
return queueName;
}
public String getMinResources() {
return minResources;
}
public String getMaxResources() {
return maxResources;
}
public String getAclAdministerApps() {
return aclAdministerApps;
}
public String getAclSubmitApps() {
return aclSubmitApps;
}
public String getSchedulingPolicy() {
return schedulingPolicy;
}
public Integer getMaxRunningApps() {
return maxRunningApps;
}
public Double getMaxAMShare() {
return maxAMShare;
}
public Integer getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
public Boolean getParent() {
return parent;
}
public String getMaxChildResources() {
return maxChildResources;
}
public Integer getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
public Double getFairSharePreemptionThreshold() {
return fairSharePreemptionThreshold;
}
public String getMaxContainerAllocation() {
return maxContainerAllocation;
}
/**
* Builder class for {@link AllocationFileQueueProperties}.
*/
public static final class Builder {
private String queueName;
private Boolean parent = false;
private String minResources;
private String maxResources;
private String aclAdministerApps;
private String aclSubmitApps;
private String schedulingPolicy;
private Integer maxRunningApps;
private Double maxAMShare;
private Integer minSharePreemptionTimeout;
private String maxChildResources;
private Integer fairSharePreemptionTimeout;
private Double fairSharePreemptionThreshold;
private String maxContainerAllocation;
Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder queueName(String queueName) {
this.queueName = queueName;
return this;
}
public Builder minResources(String minResources) {
this.minResources = minResources;
return this;
}
public Builder maxResources(String maxResources) {
this.maxResources = maxResources;
return this;
}
public Builder aclAdministerApps(String aclAdministerApps) {
this.aclAdministerApps = aclAdministerApps;
return this;
}
public Builder aclSubmitApps(String aclSubmitApps) {
this.aclSubmitApps = aclSubmitApps;
return this;
}
public Builder schedulingPolicy(String schedulingPolicy) {
this.schedulingPolicy = schedulingPolicy;
return this;
}
public Builder maxRunningApps(Integer maxRunningApps) {
this.maxRunningApps = maxRunningApps;
return this;
}
public Builder maxAMShare(Double maxAMShare) {
this.maxAMShare = maxAMShare;
return this;
}
public Builder maxContainerAllocation(String maxContainerAllocation) {
this.maxContainerAllocation = maxContainerAllocation;
return this;
}
public Builder minSharePreemptionTimeout(
Integer minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
return this;
}
public Builder parent(Boolean parent) {
this.parent = parent;
return this;
}
public Builder maxChildResources(String maxChildResources) {
this.maxChildResources = maxChildResources;
return this;
}
public Builder fairSharePreemptionTimeout(
Integer fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
return this;
}
public Builder fairSharePreemptionThreshold(
Double fairSharePreemptionThreshold) {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
return this;
}
public AllocationFileQueueProperties build() {
return new AllocationFileQueueProperties(this);
}
}
}

View File

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import java.util.ArrayList;
import java.util.List;
/**
* Queue builder that can build a simple queue with its properties.
* Subqueues can be added with {@link #addSubQueue(AllocationFileQueue)}.
*/
public class AllocationFileSimpleQueueBuilder
extends AllocationFileQueueBuilder {
private final AllocationFileWriter allocationFileWriter;
private final List<AllocationFileQueue> subQueues = new ArrayList<>();
AllocationFileSimpleQueueBuilder(AllocationFileWriter allocationFileWriter,
String queueName) {
this.allocationFileWriter = allocationFileWriter;
getqueuePropertiesBuilder().queueName(queueName);
}
void addSubQueue(AllocationFileQueue queue) {
subQueues.add(queue);
}
@Override
public AllocationFileWriter buildQueue() {
AllocationFileQueueProperties queueProperties =
getqueuePropertiesBuilder().build();
AllocationFileQueue queue =
new AllocationFileQueue(queueProperties, subQueues);
if (allocationFileWriter != null) {
allocationFileWriter.addQueue(queue);
} else {
throw new IllegalStateException(
"allocationFileWriter field has to be set on a " + getClass());
}
return allocationFileWriter;
}
@Override
public AllocationFileSimpleQueueBuilder buildSubQueue() {
throw new IllegalStateException(
"buildSubQueue is not supported in " + getClass());
}
}

View File

@ -1,54 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import com.google.common.collect.Lists;
/**
* Queue builder that can build a subqueue with its properties.
*/
public class AllocationFileSubQueueBuilder extends AllocationFileQueueBuilder {
private AllocationFileSimpleQueueBuilder parentQueueBuilder;
AllocationFileSubQueueBuilder(
AllocationFileSimpleQueueBuilder parentQueueBuilder, String queueName) {
getqueuePropertiesBuilder().queueName(queueName);
this.parentQueueBuilder = parentQueueBuilder;
}
@Override
public AllocationFileWriter buildQueue() {
throw new IllegalStateException(
"BuildQueue is not supported in " + getClass());
}
public AllocationFileSimpleQueueBuilder buildSubQueue() {
AllocationFileQueueProperties queueProperties =
getqueuePropertiesBuilder().build();
AllocationFileQueue queue =
new AllocationFileQueue(queueProperties, Lists.newArrayList());
if (parentQueueBuilder != null) {
parentQueueBuilder.addSubQueue(queue);
return parentQueueBuilder;
} else {
throw new IllegalStateException(
"parentQueueBuilder field has to be set on a " + getClass());
}
}
}

View File

@ -16,12 +16,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Supplier;
/** /**
* This class is capable of serializing allocation file data to a file * This class is capable of serializing allocation file data to a file
@ -29,6 +27,10 @@ import java.util.function.Supplier;
* See {@link #writeToFile(String)} method for the implementation. * See {@link #writeToFile(String)} method for the implementation.
*/ */
public final class AllocationFileWriter { public final class AllocationFileWriter {
private static final String DRF = "drf";
private static final String FAIR = "fair";
private static final String FIFO = "fifo";
private Integer queueMaxAppsDefault; private Integer queueMaxAppsDefault;
private String queueMaxResourcesDefault; private String queueMaxResourcesDefault;
private Integer userMaxAppsDefault; private Integer userMaxAppsDefault;
@ -39,6 +41,10 @@ public final class AllocationFileWriter {
private String defaultQueueSchedulingPolicy; private String defaultQueueSchedulingPolicy;
private List<AllocationFileQueue> queues = new ArrayList<>(); private List<AllocationFileQueue> queues = new ArrayList<>();
private UserSettings userSettings; private UserSettings userSettings;
private boolean useLegacyTagNameForQueues = false;
private String reservationAgent;
private String reservationPolicy;
private AllocationFileQueuePlacementPolicy queuePlacementPolicy;
private AllocationFileWriter() { private AllocationFileWriter() {
} }
@ -47,8 +53,9 @@ public final class AllocationFileWriter {
return new AllocationFileWriter(); return new AllocationFileWriter();
} }
public AllocationFileSimpleQueueBuilder queue(String queueName) { public AllocationFileWriter addQueue(AllocationFileQueue queue) {
return new AllocationFileSimpleQueueBuilder(this, queueName); queues.add(queue);
return this;
} }
public AllocationFileWriter queueMaxAppsDefault(int value) { public AllocationFileWriter queueMaxAppsDefault(int value) {
@ -71,6 +78,11 @@ public final class AllocationFileWriter {
return this; return this;
} }
public AllocationFileWriter disableQueueMaxAMShareDefault() {
this.queueMaxAMShareDefault = -1.0d;
return this;
}
public AllocationFileWriter defaultMinSharePreemptionTimeout(int value) { public AllocationFileWriter defaultMinSharePreemptionTimeout(int value) {
this.defaultMinSharePreemptionTimeout = value; this.defaultMinSharePreemptionTimeout = value;
return this; return this;
@ -87,26 +99,57 @@ public final class AllocationFileWriter {
return this; return this;
} }
public AllocationFileWriter defaultQueueSchedulingPolicy(String value) { public AllocationFileWriter drfDefaultQueueSchedulingPolicy() {
this.defaultQueueSchedulingPolicy = value; this.defaultQueueSchedulingPolicy = DRF;
return this; return this;
} }
public UserSettings.Builder userSettings(String username) { public AllocationFileWriter fairDefaultQueueSchedulingPolicy() {
return new UserSettings.Builder(this, username); this.defaultQueueSchedulingPolicy = FAIR;
return this;
} }
void addQueue(AllocationFileQueue queue) { public AllocationFileWriter fifoDefaultQueueSchedulingPolicy() {
this.queues.add(queue); this.defaultQueueSchedulingPolicy = FIFO;
return this;
} }
void setUserSettings(UserSettings userSettings) { public AllocationFileWriter useLegacyTagNameForQueues() {
this.userSettings = userSettings; this.useLegacyTagNameForQueues = true;
return this;
} }
static void printQueues(PrintWriter pw, List<AllocationFileQueue> queues) { public AllocationFileWriter reservationAgent(String value) {
this.reservationAgent = value;
return this;
}
public AllocationFileWriter reservationPolicy(String value) {
this.reservationPolicy = value;
return this;
}
public AllocationFileWriter userSettings(UserSettings settings) {
this.userSettings = settings;
return this;
}
public AllocationFileWriter queuePlacementPolicy(
AllocationFileQueuePlacementPolicy policy) {
this.queuePlacementPolicy = policy;
return this;
}
static void printQueues(PrintWriter pw, List<AllocationFileQueue> queues,
boolean useLegacyTagName) {
for (AllocationFileQueue queue : queues) { for (AllocationFileQueue queue : queues) {
pw.println(queue.render()); final String queueStr;
if (useLegacyTagName) {
queueStr = queue.renderWithLegacyTag();
} else {
queueStr = queue.render();
}
pw.println(queueStr);
} }
} }
@ -114,22 +157,18 @@ public final class AllocationFileWriter {
pw.println(userSettings.render()); pw.println(userSettings.render());
} }
static void addIfPresent(PrintWriter pw, String tag, private void printQueuePlacementPolicy(PrintWriter pw) {
Supplier<String> supplier) { pw.println(queuePlacementPolicy.render());
if (supplier.get() != null) {
pw.println("<" + tag + ">" + supplier.get() + "</" + tag + ">");
}
} }
static String createNumberSupplier(Object number) { static void addIfPresent(PrintWriter pw, String tag, Object obj) {
if (number != null) { if (obj != null) {
return number.toString(); pw.println("<" + tag + ">" + obj.toString() + "</" + tag + ">");
} }
return null;
} }
private void writeHeader(PrintWriter pw) { private void writeHeader(PrintWriter pw) {
pw.println("<?xml version=\"1.0\"?>"); pw.println("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
pw.println("<allocations>"); pw.println("<allocations>");
} }
@ -140,34 +179,37 @@ public final class AllocationFileWriter {
public void writeToFile(String filename) { public void writeToFile(String filename) {
PrintWriter pw; PrintWriter pw;
try { try {
pw = new PrintWriter(new FileWriter(filename)); pw = new PrintWriter(filename, "UTF-8");
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
writeHeader(pw); writeHeader(pw);
if (!queues.isEmpty()) { if (!queues.isEmpty()) {
printQueues(pw, queues); printQueues(pw, queues, useLegacyTagNameForQueues);
} }
if (userSettings != null) { if (userSettings != null) {
printUserSettings(pw); printUserSettings(pw);
} }
addIfPresent(pw, "queueMaxAppsDefault", if (queuePlacementPolicy != null) {
() -> createNumberSupplier(queueMaxAppsDefault)); printQueuePlacementPolicy(pw);
addIfPresent(pw, "queueMaxResourcesDefault", }
() -> queueMaxResourcesDefault);
addIfPresent(pw, "userMaxAppsDefault", addIfPresent(pw, "queueMaxAppsDefault", queueMaxAppsDefault);
() -> createNumberSupplier(userMaxAppsDefault)); addIfPresent(pw, "queueMaxResourcesDefault", queueMaxResourcesDefault);
addIfPresent(pw, "queueMaxAMShareDefault", addIfPresent(pw, "userMaxAppsDefault", userMaxAppsDefault);
() -> createNumberSupplier(queueMaxAMShareDefault)); addIfPresent(pw, "queueMaxAMShareDefault", queueMaxAMShareDefault);
addIfPresent(pw, "defaultMinSharePreemptionTimeout", addIfPresent(pw, "defaultMinSharePreemptionTimeout",
() -> createNumberSupplier(defaultMinSharePreemptionTimeout)); defaultMinSharePreemptionTimeout);
addIfPresent(pw, "defaultFairSharePreemptionTimeout", addIfPresent(pw, "defaultFairSharePreemptionTimeout",
() -> createNumberSupplier(defaultFairSharePreemptionTimeout)); defaultFairSharePreemptionTimeout);
addIfPresent(pw, "defaultFairSharePreemptionThreshold", addIfPresent(pw, "defaultFairSharePreemptionThreshold",
() -> createNumberSupplier(defaultFairSharePreemptionThreshold)); defaultFairSharePreemptionThreshold);
addIfPresent(pw, "defaultQueueSchedulingPolicy", addIfPresent(pw, "defaultQueueSchedulingPolicy",
() -> defaultQueueSchedulingPolicy); defaultQueueSchedulingPolicy);
addIfPresent(pw, "reservation-agent", reservationAgent);
addIfPresent(pw, "reservation-policy", reservationPolicy);
writeFooter(pw); writeFooter(pw);
pw.close(); pw.close();
} }

View File

@ -23,7 +23,7 @@ import java.io.StringWriter;
* Value class that stores user settings and can render data in XML format, * Value class that stores user settings and can render data in XML format,
* see {@link #render()}. * see {@link #render()}.
*/ */
class UserSettings { public class UserSettings {
private final String username; private final String username;
private final Integer maxRunningApps; private final Integer maxRunningApps;
@ -36,8 +36,7 @@ class UserSettings {
StringWriter sw = new StringWriter(); StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw); PrintWriter pw = new PrintWriter(sw);
addStartTag(pw); addStartTag(pw);
AllocationFileWriter.addIfPresent(pw, "maxRunningApps", AllocationFileWriter.addIfPresent(pw, "maxRunningApps", maxRunningApps);
() -> AllocationFileWriter.createNumberSupplier(maxRunningApps));
addEndTag(pw); addEndTag(pw);
pw.close(); pw.close();
@ -56,12 +55,10 @@ class UserSettings {
* Builder class for {@link UserSettings} * Builder class for {@link UserSettings}
*/ */
public static class Builder { public static class Builder {
private final AllocationFileWriter allocationFileWriter;
private final String username; private final String username;
private Integer maxRunningApps; private Integer maxRunningApps;
Builder(AllocationFileWriter allocationFileWriter, String username) { public Builder(String username) {
this.allocationFileWriter = allocationFileWriter;
this.username = username; this.username = username;
} }
@ -70,11 +67,8 @@ class UserSettings {
return this; return this;
} }
public AllocationFileWriter build() { public UserSettings build() {
UserSettings userSettings = new UserSettings(this); return new UserSettings(this);
allocationFileWriter.setUserSettings(userSettings);
return allocationFileWriter;
} }
} }
} }

View File

@ -17,8 +17,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
@ -113,38 +114,23 @@ public class FSConfigConverterTestCommons {
configureDummyConversionRulesFile(); configureDummyConversionRulesFile();
} }
@SuppressWarnings("checkstyle:linelength") public static void configureFairSchedulerXml() {
public static void configureFairSchedulerXml() throws IOException { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); .disableQueueMaxAMShareDefault()
out.println("<?xml version=\"1.0\"?>"); .fairDefaultQueueSchedulingPolicy()
out.println("<allocations>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>"); .schedulingPolicy("fair")
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>"); .weight(1.0f)
addQueue(out, ""); .fairSharePreemptionTimeout(100)
out.println("</allocations>"); .minSharePreemptionTimeout(120)
out.close(); .fairSharePreemptionThreshold(0.5f)
.build())
.writeToFile(FS_ALLOC_FILE);
} }
@SuppressWarnings("checkstyle:linelength") public static void configureEmptyFairSchedulerXml() {
private static void addQueue(PrintWriter out, String additionalConfig) { AllocationFileWriter.create()
out.println("<queue name=\"root\">"); .writeToFile(FS_ALLOC_FILE);
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
out.println(" <weight>1.0</weight>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
if (StringUtils.isNotEmpty(additionalConfig)) {
out.println(additionalConfig);
}
out.println("</queue>");
}
public static void configureEmptyFairSchedulerXml() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations></allocations>");
out.close();
} }
public static void configureYarnSiteXmlWithFsAllocFileDefined() public static void configureYarnSiteXmlWithFsAllocFileDefined()

View File

@ -26,9 +26,7 @@ import static org.junit.Assume.assumeTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader; import java.io.StringReader;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.URI; import java.net.URI;
@ -83,6 +81,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
@ -204,23 +207,15 @@ public class TestRMWebServicesAppsModification extends JerseyTestBase {
private class FairTestServletModule extends TestServletModule { private class FairTestServletModule extends TestServletModule {
@Override @Override
public void configureScheduler() { public void configureScheduler() {
try { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<?xml version=\"1.0\"?>"); .aclAdministerApps("someuser ")
out.println("<allocations>"); .subQueue(new AllocationFileQueue.Builder("default")
out.println("<queue name=\"root\">"); .aclAdministerApps("someuser ").build())
out.println(" <aclAdministerApps>someuser </aclAdministerApps>"); .subQueue(new AllocationFileQueue.Builder("test")
out.println(" <queue name=\"default\">"); .aclAdministerApps("someuser ").build())
out.println(" <aclAdministerApps>someuser </aclAdministerApps>"); .build())
out.println(" </queue>"); .writeToFile(FS_ALLOC_FILE);
out.println(" <queue name=\"test\">");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
} catch(IOException e) {
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
} }

View File

@ -24,9 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.FileWriter;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringReader; import java.io.StringReader;
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
@ -53,6 +51,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.allocationfile.AllocationFileWriter;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo; import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
@ -181,27 +184,17 @@ public class TestRMWebServicesReservation extends JerseyTestBase {
private static class FairTestServletModule extends TestServletModule { private static class FairTestServletModule extends TestServletModule {
@Override @Override
public void configureScheduler() { public void configureScheduler() {
try { AllocationFileWriter.create()
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE)); .drfDefaultQueueSchedulingPolicy()
out.println("<?xml version=\"1.0\"?>"); .addQueue(new AllocationFileQueue.Builder("root")
out.println("<allocations>"); .aclAdministerApps("someuser ")
out.println("<queue name=\"root\">"); .subQueue(new AllocationFileQueue.Builder("default")
out.println(" <aclAdministerApps>someuser </aclAdministerApps>"); .aclAdministerApps("someuser ").build())
out.println(" <queue name=\"default\">"); .subQueue(new AllocationFileQueue.Builder("dedicated")
out.println(" <aclAdministerApps>someuser </aclAdministerApps>"); .reservation()
out.println(" </queue>"); .aclAdministerApps("someuser ").build())
out.println(" <queue name=\"dedicated\">"); .build())
out.println(" <reservation>"); .writeToFile(FS_ALLOC_FILE);
out.println(" </reservation>");
out.println(" <aclAdministerApps>someuser </aclAdministerApps>");
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
} catch (IOException e) {
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName()); conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
} }