YARN-7370: Preemption properties should be refreshable. Contrubted by Gergely Novák.

This commit is contained in:
Eric Payne 2017-11-02 12:37:33 -05:00
parent 04c604cf1d
commit e6ec02001f
5 changed files with 115 additions and 34 deletions

View File

@ -73,9 +73,13 @@ public class SchedulingMonitor extends AbstractService {
return t; return t;
} }
}); });
schedulePreemptionChecker();
super.serviceStart();
}
private void schedulePreemptionChecker() {
handler = ses.scheduleAtFixedRate(new PreemptionChecker(), handler = ses.scheduleAtFixedRate(new PreemptionChecker(),
0, monitorInterval, TimeUnit.MILLISECONDS); 0, monitorInterval, TimeUnit.MILLISECONDS);
super.serviceStart();
} }
@Override @Override
@ -98,8 +102,13 @@ public class SchedulingMonitor extends AbstractService {
@Override @Override
public void run() { public void run() {
try { try {
//invoke the preemption policy if (monitorInterval != scheduleEditPolicy.getMonitoringInterval()) {
handler.cancel(true);
monitorInterval = scheduleEditPolicy.getMonitoringInterval();
schedulePreemptionChecker();
} else {
invokePolicy(); invokePolicy();
}
} catch (Throwable t) { } catch (Throwable t) {
// The preemption monitor does not alter structures nor do structures // The preemption monitor does not alter structures nor do structures
// persist across invocations. Therefore, log, skip, and retry. // persist across invocations. Therefore, log, skip, and retry.

View File

@ -108,6 +108,9 @@ public class ProportionalCapacityPreemptionPolicy
private float minimumThresholdForIntraQueuePreemption; private float minimumThresholdForIntraQueuePreemption;
private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy; private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
// Current configuration
private CapacitySchedulerConfiguration csConfig;
// Pointer to other RM components // Pointer to other RM components
private RMContext rmContext; private RMContext rmContext;
private ResourceCalculator rc; private ResourceCalculator rc;
@ -121,8 +124,7 @@ public class ProportionalCapacityPreemptionPolicy
new HashMap<>(); new HashMap<>();
private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues = private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
new HashMap<String, LinkedHashSet<String>>(); new HashMap<String, LinkedHashSet<String>>();
private List<PreemptionCandidatesSelector> private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
candidatesSelectionPolicies = new ArrayList<>();
private Set<String> allPartitions; private Set<String> allPartitions;
private Set<String> leafQueueNames; private Set<String> leafQueueNames;
@ -160,68 +162,76 @@ public class ProportionalCapacityPreemptionPolicy
} }
rmContext = context; rmContext = context;
scheduler = (CapacityScheduler) sched; scheduler = (CapacityScheduler) sched;
CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration(); rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
updateConfigIfNeeded();
}
maxIgnoredOverCapacity = csConfig.getDouble( private void updateConfigIfNeeded() {
CapacitySchedulerConfiguration config = scheduler.getConfiguration();
if (config == csConfig) {
return;
}
maxIgnoredOverCapacity = config.getDouble(
CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY, CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY); CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY);
naturalTerminationFactor = csConfig.getDouble( naturalTerminationFactor = config.getDouble(
CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR); CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR);
maxWaitTime = csConfig.getLong( maxWaitTime = config.getLong(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL); CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL);
monitoringInterval = csConfig.getLong( monitoringInterval = config.getLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL); CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL);
percentageClusterPreemptionAllowed = csConfig.getFloat( percentageClusterPreemptionAllowed = config.getFloat(
CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND); CapacitySchedulerConfiguration.DEFAULT_TOTAL_PREEMPTION_PER_ROUND);
observeOnly = csConfig.getBoolean( observeOnly = config.getBoolean(
CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY, CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY); CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY);
lazyPreempionEnabled = csConfig.getBoolean( lazyPreempionEnabled = config.getBoolean(
CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED); CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat( maxAllowableLimitForIntraQueuePreemption = config.getFloat(
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT, INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT); DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
minimumThresholdForIntraQueuePreemption = csConfig.getFloat( minimumThresholdForIntraQueuePreemption = config.getFloat(
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD, INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD); DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
.valueOf(csConfig .valueOf(config
.get( .get(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY, CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY) CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
.toUpperCase()); .toUpperCase());
rc = scheduler.getResourceCalculator(); candidatesSelectionPolicies = new ArrayList<>();
nlm = scheduler.getRMContext().getNodeLabelManager();
// Do we need white queue-priority preemption policy? // Do we need white queue-priority preemption policy?
boolean isQueuePriorityPreemptionEnabled = boolean isQueuePriorityPreemptionEnabled =
csConfig.getPUOrderingPolicyUnderUtilizedPreemptionEnabled(); config.getPUOrderingPolicyUnderUtilizedPreemptionEnabled();
if (isQueuePriorityPreemptionEnabled) { if (isQueuePriorityPreemptionEnabled) {
candidatesSelectionPolicies.add( candidatesSelectionPolicies.add(
new QueuePriorityContainerCandidateSelector(this)); new QueuePriorityContainerCandidateSelector(this));
} }
// Do we need to specially consider reserved containers? // Do we need to specially consider reserved containers?
boolean selectCandidatesForResevedContainers = csConfig.getBoolean( boolean selectCandidatesForResevedContainers = config.getBoolean(
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
CapacitySchedulerConfiguration. CapacitySchedulerConfiguration.
@ -231,7 +241,7 @@ public class ProportionalCapacityPreemptionPolicy
.add(new ReservedContainerCandidatesSelector(this)); .add(new ReservedContainerCandidatesSelector(this));
} }
boolean additionalPreemptionBasedOnReservedResource = csConfig.getBoolean( boolean additionalPreemptionBasedOnReservedResource = config.getBoolean(
CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS, CapacitySchedulerConfiguration.ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS,
CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS); CapacitySchedulerConfiguration.DEFAULT_ADDITIONAL_RESOURCE_BALANCE_BASED_ON_RESERVED_CONTAINERS);
@ -240,12 +250,39 @@ public class ProportionalCapacityPreemptionPolicy
additionalPreemptionBasedOnReservedResource)); additionalPreemptionBasedOnReservedResource));
// Do we need to specially consider intra queue // Do we need to specially consider intra queue
boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean( boolean isIntraQueuePreemptionEnabled = config.getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED); CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
if (isIntraQueuePreemptionEnabled) { if (isIntraQueuePreemptionEnabled) {
candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this)); candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
} }
LOG.info("Capacity Scheduler configuration changed, updated preemption " +
"properties to:\n" +
"max_ignored_over_capacity = " + maxIgnoredOverCapacity + "\n" +
"natural_termination_factor = " + naturalTerminationFactor + "\n" +
"max_wait_before_kill = " + maxWaitTime + "\n" +
"monitoring_interval = " + monitoringInterval + "\n" +
"total_preemption_per_round = " + percentageClusterPreemptionAllowed +
"\n" +
"observe_only = " + observeOnly + "\n" +
"lazy-preemption-enabled = " + lazyPreempionEnabled + "\n" +
"intra-queue-preemption.enabled = " + isIntraQueuePreemptionEnabled +
"\n" +
"intra-queue-preemption.max-allowable-limit = " +
maxAllowableLimitForIntraQueuePreemption + "\n" +
"intra-queue-preemption.minimum-threshold = " +
minimumThresholdForIntraQueuePreemption + "\n" +
"intra-queue-preemption.preemption-order-policy = " +
intraQueuePreemptionOrderPolicy + "\n" +
"priority-utilization.underutilized-preemption.enabled = " +
isQueuePriorityPreemptionEnabled + "\n" +
"select_based_on_reserved_containers = " +
selectCandidatesForResevedContainers + "\n" +
"additional_res_balance_based_on_reserved_containers = " +
additionalPreemptionBasedOnReservedResource);
csConfig = config;
} }
@Override @Override
@ -255,6 +292,8 @@ public class ProportionalCapacityPreemptionPolicy
@Override @Override
public synchronized void editSchedule() { public synchronized void editSchedule() {
updateConfigIfNeeded();
long startTs = clock.getTime(); long startTs = clock.getTime();
CSQueue root = scheduler.getRootQueue(); CSQueue root = scheduler.getRootQueue();

View File

@ -296,7 +296,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
"reservation-enforcement-window"; "reservation-enforcement-window";
@Private @Private
public static final String LAZY_PREEMPTION_ENALBED = PREFIX + "lazy-preemption-enabled"; public static final String LAZY_PREEMPTION_ENABLED =
PREFIX + "lazy-preemption-enabled";
@Private @Private
public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false; public static final boolean DEFAULT_LAZY_PREEMPTION_ENABLED = false;
@ -1166,7 +1167,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
} }
public boolean getLazyPreemptionEnabled() { public boolean getLazyPreemptionEnabled() {
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED); return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED);
} }
private static final String PREEMPTION_CONFIG_PREFIX = private static final String PREEMPTION_CONFIG_PREFIX =
@ -1207,7 +1208,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* completions) it might prevent convergence to guaranteed capacity. */ * completions) it might prevent convergence to guaranteed capacity. */
public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY = public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity"; PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity";
public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f; public static final double DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1;
/** /**
* Given a computed preemption target, account for containers naturally * Given a computed preemption target, account for containers naturally
* expiring and preempt only this percentage of the delta. This determines * expiring and preempt only this percentage of the delta. This determines
@ -1217,8 +1218,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */ * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR = public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
PREEMPTION_CONFIG_PREFIX + "natural_termination_factor"; PREEMPTION_CONFIG_PREFIX + "natural_termination_factor";
public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR = public static final double DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
0.2f; 0.2;
/** /**
* By default, reserved resource will be excluded while balancing capacities * By default, reserved resource will be excluded while balancing capacities

View File

@ -187,9 +187,7 @@ public class TestProportionalCapacityPreemptionPolicy {
appAlloc = 0; appAlloc = 0;
} }
@Test private static final int[][] Q_DATA_FOR_IGNORE = new int[][]{
public void testIgnore() {
int[][] qData = new int[][]{
// / A B C // / A B C
{ 100, 40, 40, 20 }, // abs { 100, 40, 40, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap { 100, 100, 100, 100 }, // maxCap
@ -200,7 +198,11 @@ public class TestProportionalCapacityPreemptionPolicy {
{ -1, 1, 1, 1 }, // req granularity { -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues { 3, 0, 0, 0 }, // subqueues
}; };
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
@Test
public void testIgnore() {
ProportionalCapacityPreemptionPolicy policy =
buildPolicy(Q_DATA_FOR_IGNORE);
policy.editSchedule(); policy.editSchedule();
// don't correct imbalances without demand // don't correct imbalances without demand
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
@ -1033,6 +1035,36 @@ public class TestProportionalCapacityPreemptionPolicy {
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
} }
@Test
public void testRefreshPreemptionProperties() throws Exception {
ProportionalCapacityPreemptionPolicy policy =
buildPolicy(Q_DATA_FOR_IGNORE);
assertEquals(
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_MONITORING_INTERVAL,
policy.getMonitoringInterval());
assertEquals(
CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_OBSERVE_ONLY,
policy.isObserveOnly());
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
long newMonitoringInterval = 5000;
boolean newObserveOnly = true;
newConf.setLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
newMonitoringInterval);
newConf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
newObserveOnly);
when(mCS.getConfiguration()).thenReturn(newConf);
policy.editSchedule();
assertEquals(newMonitoringInterval, policy.getMonitoringInterval());
assertEquals(newObserveOnly, policy.isObserveOnly());
}
static class IsPreemptionRequestFor static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> { extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId; private final ApplicationAttemptId appAttId;

View File

@ -54,7 +54,7 @@ public class TestCapacitySchedulerLazyPreemption
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED, conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENABLED,
true); true);
} }