diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 631d1a0f835..2a741ed83cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -73,9 +73,13 @@ public class SchedulingMonitor extends AbstractService { return t; } }); + schedulePreemptionChecker(); + super.serviceStart(); + } + + private void schedulePreemptionChecker() { handler = ses.scheduleAtFixedRate(new PreemptionChecker(), 0, monitorInterval, TimeUnit.MILLISECONDS); - super.serviceStart(); } @Override @@ -98,8 +102,13 @@ public class SchedulingMonitor extends AbstractService { @Override public void run() { try { - //invoke the preemption policy - invokePolicy(); + if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) { + handler.cancel(true); + monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + schedulePreemptionChecker(); + } else { + invokePolicy(); + } } catch (Throwable t) { // The preemption monitor does not alter structures nor do structures // persist across invocations. Therefore, log, skip, and retry. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index 1326c52ae27..860b29794a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -109,6 +109,9 @@ public class ProportionalCapacityPreemptionPolicy private float minimumThresholdForIntraQueuePreemption; private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; + // Current configuration + private CapacitySchedulerConfiguration csConfig; + // Pointer to other RM components private RMContext rmContext; private ResourceCalculator rc; @@ -122,8 +125,7 @@ public class ProportionalCapacityPreemptionPolicy new HashMap<>(); private Map> partitionToUnderServedQueues = new HashMap>(); - private List - candidatesSelectionPolicies = new ArrayList<>(); + private List candidatesSelectionPolicies; private Set allPartitions; private Set leafQueueNames; @@ -161,68 +163,76 @@ public class ProportionalCapacityPreemptionPolicy } rmContext = context; scheduler = (CapacityScheduler) sched; - CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); + rc = scheduler.getResourceCalculator(); + nlm = scheduler.getRMContext().getNodeLabelManager(); + updateConfigIfNeeded(); + } - maxIgnoredOverCapacity = csConfig.getDouble( + private void updateConfigIfNeeded() { + CapacitySchedulerConfiguration config = scheduler.getConfiguration(); + if (config == csConfig) { + return; + } + + maxIgnoredOverCapacity = config.getDouble( CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY); - naturalTerminationFactor = csConfig.getDouble( + naturalTerminationFactor = config.getDouble( CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR); - maxWaitTime = csConfig.getLong( + maxWaitTime = config.getLong( CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL); - monitoringInterval = csConfig.getLong( + monitoringInterval = config.getLong( CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL); - percentageClusterPreemptionAllowed = csConfig.getFloat( + percentageClusterPreemptionAllowed = config.getFloat( CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND); - observeOnly = csConfig.getBoolean( + observeOnly = config.getBoolean( CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY); - lazyPreempionEnabled = csConfig.getBoolean( - CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + lazyPreempionEnabled = config.getBoolean( + CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); - maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( + maxAllowableLimitForIntraQueuePreemption = config.getFloat( CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, CapacitySchedulerConfiguration. DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); - minimumThresholdForIntraQueuePreemption = csConfig.getFloat( + minimumThresholdForIntraQueuePreemption = config.getFloat( CapacitySchedulerConfiguration. INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, CapacitySchedulerConfiguration. DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy - .valueOf(csConfig + .valueOf(config .get( CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) .toUpperCase()); - rc = scheduler.getResourceCalculator(); - nlm = scheduler.getRMContext().getNodeLabelManager(); + candidatesSelectionPolicies = new ArrayList<>(); // Do we need white queue-priority preemption policy? boolean isQueuePriorityPreemptionEnabled = - csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); + config.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); if (isQueuePriorityPreemptionEnabled) { candidatesSelectionPolicies.add( new QueuePriorityContainerCandidateSelector(this)); } // Do we need to specially consider reserved containers? - boolean selectCandidatesForResevedContainers = csConfig.getBoolean( + boolean selectCandidatesForResevedContainers = config.getBoolean( CapacitySchedulerConfiguration. PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, CapacitySchedulerConfiguration. @@ -232,7 +242,7 @@ public class ProportionalCapacityPreemptionPolicy .add(new ReservedContainerCandidatesSelector(this)); } - boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean( + boolean additionalPreemptionBasedOnReservedResource = config.getBoolean( CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS); @@ -241,12 +251,39 @@ public class ProportionalCapacityPreemptionPolicy additionalPreemptionBasedOnReservedResource)); // Do we need to specially consider intra queue - boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( + boolean isIntraQueuePreemptionEnabled = config.getBoolean( CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); if (isIntraQueuePreemptionEnabled) { candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); } + + LOG.info("Capacity Scheduler configuration changed, updated preemption " + + "properties to:\n" + + "max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" + + "natural_termination_factor = " + naturalTerminationFactor + "\n" + + "max_wait_before_kill = " + maxWaitTime + "\n" + + "monitoring_interval = " + monitoringInterval + "\n" + + "total_preemption_per_round = " + percentageClusterPreemptionAllowed + + "\n" + + "observe_only = " + observeOnly + "\n" + + "lazy-preemption-enabled = " + lazyPreempionEnabled + "\n" + + "intra-queue-preemption.enabled = " + isIntraQueuePreemptionEnabled + + "\n" + + "intra-queue-preemption.max-allowable-limit = " + + maxAllowableLimitForIntraQueuePreemption + "\n" + + "intra-queue-preemption.minimum-threshold = " + + minimumThresholdForIntraQueuePreemption + "\n" + + "intra-queue-preemption.preemption-order-policy = " + + intraQueuePreemptionOrderPolicy + "\n" + + "priority-utilization.underutilized-preemption.enabled = " + + isQueuePriorityPreemptionEnabled + "\n" + + "select_based_on_reserved_containers = " + + selectCandidatesForResevedContainers + "\n" + + "additional_res_balance_based_on_reserved_containers = " + + additionalPreemptionBasedOnReservedResource); + + csConfig = config; } @Override @@ -256,6 +293,8 @@ public class ProportionalCapacityPreemptionPolicy @Override public synchronized void editSchedule() { + updateConfigIfNeeded(); + long startTs = clock.getTime(); CSQueue root = scheduler.getRootQueue(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 3a519ecf5f1..bfead359340 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -296,7 +296,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur "reservation-enforcement-window"; @Private - public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; + public static final String LAZY_PREEMPTION_ENABLED = + PREFIX + "lazy-preemption-enabled"; @Private public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; @@ -1166,7 +1167,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur } public boolean getLazyPreemptionEnabled() { - return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); + return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED); } private static final String PREEMPTION_CONFIG_PREFIX = @@ -1207,7 +1208,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * completions) it might prevent convergence to guaranteed capacity. */ public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity"; - public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; + public static final double DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1; /** * Given a computed preemption target, account for containers naturally * expiring and preempt only this percentage of the delta. This determines @@ -1217,8 +1218,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = PREEMPTION_CONFIG_PREFIX + "natural_termination_factor"; - public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = - 0.2f; + public static final double DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = + 0.2; /** * By default, reserved resource will be excluded while balancing capacities diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index c777433d476..505614de2be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -186,9 +186,7 @@ public class TestProportionalCapacityPreemptionPolicy { appAlloc = 0; } - @Test - public void testIgnore() { - int[][] qData = new int[][]{ + private static final int[][] Q_DATA_FOR_IGNORE = new int[][]{ // / A B C { 100, 40, 40, 20 }, // abs { 100, 100, 100, 100 }, // maxCap @@ -198,8 +196,12 @@ public class TestProportionalCapacityPreemptionPolicy { { 3, 1, 1, 1 }, // apps { -1, 1, 1, 1 }, // req granularity { 3, 0, 0, 0 }, // subqueues - }; - ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + }; + + @Test + public void testIgnore() { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); policy.editSchedule(); // don't correct imbalances without demand verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); @@ -1032,6 +1034,36 @@ public class TestProportionalCapacityPreemptionPolicy { verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); } + + @Test + public void testRefreshPreemptionProperties() throws Exception { + ProportionalCapacityPreemptionPolicy policy = + buildPolicy(Q_DATA_FOR_IGNORE); + + assertEquals( + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL, + policy.getMonitoringInterval()); + assertEquals( + CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY, + policy.isObserveOnly()); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + long newMonitoringInterval = 5000; + boolean newObserveOnly = true; + newConf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + newMonitoringInterval); + newConf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, + newObserveOnly); + when(mCS.getConfiguration()).thenReturn(newConf); + + policy.editSchedule(); + + assertEquals(newMonitoringInterval, policy.getMonitoringInterval()); + assertEquals(newObserveOnly, policy.isObserveOnly()); + } + static class IsPreemptionRequestFor extends ArgumentMatcher { private final ApplicationAttemptId appAttId; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java index e7157b86540..4e4e3c2064a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java @@ -54,7 +54,7 @@ public class TestCapacitySchedulerLazyPreemption @Before public void setUp() throws Exception { super.setUp(); - conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, + conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED, true); }