YARN-6124. Make SchedulingEditPolicy can be enabled / disabled / updated with RMAdmin -refreshQueues. (Zian Chen via wangda)

Change-Id: Id93656f3af7dcd78cafa94e33663c78d410d43c2
This commit is contained in:
Wangda Tan 2017-11-30 15:56:53 -08:00
parent 0780fdb1eb
commit a63d19d365
13 changed files with 391 additions and 60 deletions

View File

@ -400,14 +400,31 @@ public class AdminService extends CompositeService implements
} }
} }
protected Configuration loadNewConfiguration()
throws IOException, YarnException {
// Retrieve yarn-site.xml in order to refresh scheduling monitor properties.
Configuration conf = getConfiguration(new Configuration(false),
YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
// The reason we call Configuration#size() is because when getConfiguration
// been called, it invokes Configuration#addResouce, which invokes
// Configuration#reloadConfiguration which triggers the reload process in a
// lazy way, the properties will only be reload when it's needed rather than
// reload it right after getConfiguration been called. So here we call
// Configuration#size() to force the Configuration#getProps been called to
// reload all the properties.
conf.size();
return conf;
}
@Private @Private
public void refreshQueues() throws IOException, YarnException { public void refreshQueues() throws IOException, YarnException {
rm.getRMContext().getScheduler().reinitialize(getConfig(), Configuration conf = loadNewConfiguration();
rm.getRMContext().getScheduler().reinitialize(conf,
this.rm.getRMContext()); this.rm.getRMContext());
// refresh the reservation system // refresh the reservation system
ReservationSystem rSystem = rm.getRMContext().getReservationSystem(); ReservationSystem rSystem = rm.getRMContext().getReservationSystem();
if (rSystem != null) { if (rSystem != null) {
rSystem.reinitialize(getConfig(), rm.getRMContext()); rSystem.reinitialize(conf, rm.getRMContext());
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.AuthInfo; import org.apache.curator.framework.AuthInfo;
@ -67,8 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.metrics.NoOpSystemMetricPub
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV1Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher; import org.apache.hadoop.yarn.server.resourcemanager.metrics.TimelineServiceV2Publisher;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMDelegatedNodeLabelsUpdater;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
@ -113,8 +112,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.eclipse.jetty.webapp.WebAppContext; import org.eclipse.jetty.webapp.WebAppContext;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.PrintStream; import java.io.PrintStream;
@ -711,8 +708,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
createSchedulerMonitors();
masterService = createApplicationMasterService(); masterService = createApplicationMasterService();
addService(masterService) ; addService(masterService) ;
rmContext.setApplicationMasterService(masterService); rmContext.setApplicationMasterService(masterService);
@ -811,30 +806,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
} }
protected void createSchedulerMonitors() {
if (conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
LOG.info("Loading policy monitors");
List<SchedulingEditPolicy> policies = conf.getInstances(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
SchedulingEditPolicy.class);
if (policies.size() > 0) {
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
// periodically check whether we need to take action to guarantee
// constraints
SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
} else {
LOG.warn("Policy monitors configured (" +
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
") but none specified (" +
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
}
}
}
} }
@Private @Private

View File

@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -58,6 +57,7 @@ public class SchedulingMonitor extends AbstractService {
} }
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
LOG.info("Initializing SchedulingMonitor=" + getName());
scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler()); scheduleEditPolicy.init(conf, rmContext, rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf); super.serviceInit(conf);
@ -65,6 +65,7 @@ public class SchedulingMonitor extends AbstractService {
@Override @Override
public void serviceStart() throws Exception { public void serviceStart() throws Exception {
LOG.info("Starting SchedulingMonitor=" + getName());
assert !stopped : "starting when already stopped"; assert !stopped : "starting when already stopped";
ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { ses = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
public Thread newThread(Runnable r) { public Thread newThread(Runnable r) {

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Manages scheduling monitors.
*/
public class SchedulingMonitorManager {
private static final Log LOG = LogFactory.getLog(
SchedulingMonitorManager.class);
private Map<String, SchedulingMonitor> runningSchedulingMonitors =
new HashMap<>();
private RMContext rmContext;
private void updateSchedulingMonitors(Configuration conf,
boolean startImmediately) throws YarnException {
boolean monitorsEnabled = conf.getBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
if (!monitorsEnabled) {
if (!runningSchedulingMonitors.isEmpty()) {
// If monitors disabled while we have some running monitors, we should
// stop them.
LOG.info("Scheduling Monitor disabled, stopping all services");
stopAndRemoveAll();
}
return;
}
// When monitor is enabled, loading policies
String[] configuredPolicies = conf.getStrings(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
if (configuredPolicies == null || configuredPolicies.length == 0) {
return;
}
Set<String> configurePoliciesSet = new HashSet<>();
for (String s : configuredPolicies) {
configurePoliciesSet.add(s);
}
// Add new monitor when needed
for (String s : configurePoliciesSet) {
if (!runningSchedulingMonitors.containsKey(s)) {
Class<?> policyClass;
try {
policyClass = Class.forName(s);
} catch (ClassNotFoundException e) {
String message = "Failed to find class of specified policy=" + s;
LOG.warn(message);
throw new YarnException(message);
}
if (SchedulingEditPolicy.class.isAssignableFrom(policyClass)) {
SchedulingEditPolicy policyInstance =
(SchedulingEditPolicy) ReflectionUtils.newInstance(policyClass,
null);
SchedulingMonitor mon = new SchedulingMonitor(rmContext,
policyInstance);
mon.init(conf);
if (startImmediately) {
mon.start();
}
runningSchedulingMonitors.put(s, mon);
} else {
String message =
"Specified policy=" + s + " is not a SchedulingEditPolicy class.";
LOG.warn(message);
throw new YarnException(message);
}
}
}
// Stop monitor when needed.
Set<String> disabledPolicies = Sets.difference(
runningSchedulingMonitors.keySet(), configurePoliciesSet);
for (String disabledPolicy : disabledPolicies) {
LOG.info("SchedulingEditPolicy=" + disabledPolicy
+ " removed, stopping it now ...");
silentlyStopSchedulingMonitor(disabledPolicy);
runningSchedulingMonitors.remove(disabledPolicy);
}
}
public synchronized void initialize(RMContext rmContext,
Configuration configuration) throws YarnException {
this.rmContext = rmContext;
stopAndRemoveAll();
updateSchedulingMonitors(configuration, false);
}
public synchronized void reinitialize(RMContext rmContext,
Configuration configuration) throws YarnException {
this.rmContext = rmContext;
updateSchedulingMonitors(configuration, true);
}
public synchronized void startAll() {
for (SchedulingMonitor schedulingMonitor : runningSchedulingMonitors
.values()) {
schedulingMonitor.start();
}
}
private void silentlyStopSchedulingMonitor(String name) {
SchedulingMonitor mon = runningSchedulingMonitors.get(name);
try {
mon.stop();
LOG.info("Sucessfully stopped monitor=" + mon.getName());
} catch (Exception e) {
LOG.warn("Exception while stopping monitor=" + mon.getName(), e);
}
}
private void stopAndRemoveAll() {
if (!runningSchedulingMonitors.isEmpty()) {
for (String schedulingMonitorName : runningSchedulingMonitors
.keySet()) {
silentlyStopSchedulingMonitor(schedulingMonitorName);
}
runningSchedulingMonitors.clear();
}
}
public boolean isRSMEmpty() {
return runningSchedulingMonitors.isEmpty();
}
public boolean isSameConfiguredPolicies(Set<String> configurePoliciesSet) {
return configurePoliciesSet.equals(runningSchedulingMonitors.keySet());
}
public SchedulingMonitor getAvailableSchedulingMonitor() {
if (isRSMEmpty()) {
return null;
}
for (SchedulingMonitor smon : runningSchedulingMonitors.values()) {
if (smon.getSchedulingEditPolicy()
instanceof ProportionalCapacityPreemptionPolicy) {
return smon;
}
}
return null;
}
public synchronized void stop() throws YarnException {
stopAndRemoveAll();
}
}

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -168,6 +169,8 @@ public abstract class AbstractYarnScheduler
// the NM in the next heartbeat. // the NM in the next heartbeat.
private boolean autoUpdateContainers = false; private boolean autoUpdateContainers = false;
protected SchedulingMonitorManager schedulingMonitorManager;
/** /**
* Construct the service. * Construct the service.
* *
@ -207,8 +210,8 @@ public abstract class AbstractYarnScheduler
new RMCriticalThreadUncaughtExceptionHandler(rmContext)); new RMCriticalThreadUncaughtExceptionHandler(rmContext));
updateThread.setDaemon(true); updateThread.setDaemon(true);
} }
super.serviceInit(conf); super.serviceInit(conf);
} }
@Override @Override
@ -216,6 +219,7 @@ public abstract class AbstractYarnScheduler
if (updateThread != null) { if (updateThread != null) {
updateThread.start(); updateThread.start();
} }
schedulingMonitorManager.startAll();
super.serviceStart(); super.serviceStart();
} }
@ -225,6 +229,9 @@ public abstract class AbstractYarnScheduler
updateThread.interrupt(); updateThread.interrupt();
updateThread.join(THREAD_JOIN_TIMEOUT_MS); updateThread.join(THREAD_JOIN_TIMEOUT_MS);
} }
if (schedulingMonitorManager != null) {
schedulingMonitorManager.stop();
}
super.serviceStop(); super.serviceStop();
} }
@ -233,6 +240,11 @@ public abstract class AbstractYarnScheduler
return nodeTracker; return nodeTracker;
} }
@VisibleForTesting
public SchedulingMonitorManager getSchedulingMonitorManager() {
return schedulingMonitorManager;
}
/* /*
* YARN-3136 removed synchronized lock for this method for performance * YARN-3136 removed synchronized lock for this method for performance
* purposes * purposes
@ -1415,4 +1427,15 @@ public abstract class AbstractYarnScheduler
updateThreadMonitor.notify(); updateThreadMonitor.notify();
} }
} }
@Override
public void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
try {
LOG.info("Reinitializing SchedulingMonitorManager ...");
schedulingMonitorManager.reinitialize(rmContext, conf);
} catch (YarnException e) {
throw new IOException(e);
}
}
} }

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
@ -390,6 +391,9 @@ public class CapacityScheduler extends
Configuration configuration = new Configuration(conf); Configuration configuration = new Configuration(conf);
super.serviceInit(conf); super.serviceInit(conf);
initScheduler(configuration); initScheduler(configuration);
// Initialize SchedulingMonitorManager
schedulingMonitorManager = new SchedulingMonitorManager();
schedulingMonitorManager.initialize(rmContext, conf);
} }
@Override @Override
@ -444,6 +448,8 @@ public class CapacityScheduler extends
// Setup how many containers we can allocate for each round // Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
super.reinitialize(newConf, rmContext);
} finally { } finally {
writeLock.unlock(); writeLock.unlock();
} }

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -1352,6 +1353,10 @@ public class FairScheduler extends
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
initScheduler(conf); initScheduler(conf);
super.serviceInit(conf); super.serviceInit(conf);
// Initialize SchedulingMonitorManager
schedulingMonitorManager = new SchedulingMonitorManager();
schedulingMonitorManager.initialize(rmContext, conf);
} }
@Override @Override
@ -1389,6 +1394,7 @@ public class FairScheduler extends
throws IOException { throws IOException {
try { try {
allocsLoader.reloadAllocations(); allocsLoader.reloadAllocations();
super.reinitialize(conf, rmContext);
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed to reload allocations file", e); LOG.error("Failed to reload allocations file", e);
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -255,6 +256,10 @@ public class FifoScheduler extends
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
initScheduler(conf); initScheduler(conf);
super.serviceInit(conf); super.serviceInit(conf);
// Initialize SchedulingMonitorManager
schedulingMonitorManager = new SchedulingMonitorManager();
schedulingMonitorManager.initialize(rmContext, conf);
} }
@Override @Override
@ -312,6 +317,7 @@ public class FifoScheduler extends
reinitialize(Configuration conf, RMContext rmContext) throws IOException reinitialize(Configuration conf, RMContext rmContext) throws IOException
{ {
setConf(conf); setConf(conf);
super.reinitialize(conf, rmContext);
} }
@Override @Override

View File

@ -105,9 +105,35 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
return am; return am;
} }
private MockRM initMockRMWithOldConf(Configuration confForRM1) {
return new MockRM(confForRM1, null, false, false) {
@Override
protected AdminService createAdminService() {
return new AdminService(this) {
@Override
protected void startServer() {
// override to not start rpc handler
}
@Override
protected void stopServer() {
// don't do anything
}
@Override
protected Configuration loadNewConfiguration()
throws IOException, YarnException {
return confForRM1;
}
};
}
};
}
protected void startRMs() throws IOException { protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1, null, false, false); rm1 = initMockRMWithOldConf(confForRM1);
rm2 = new MockRM(confForRM2, null, false, false); rm2 = initMockRMWithOldConf(confForRM2);
startRMs(rm1, confForRM1, rm2, confForRM2); startRMs(rm1, confForRM1, rm2, confForRM2);
} }

View File

@ -23,8 +23,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.Test; import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -51,4 +58,38 @@ public class TestSchedulingMonitor {
monitor.close(); monitor.close();
rm.close(); rm.close();
} }
@Test(timeout = 10000)
public void testRMUpdateSchedulingEditPolicy() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
// runningSchedulingMonitors should not be empty when initialize RM
// scheduler monitor
cs.reinitialize(conf, rm.getRMContext());
assertFalse(smm.isRSMEmpty());
// make sure runningSchedulingPolicies contains all the configured policy
// in YARNConfiguration
String[] configuredPolicies = conf.getStrings(
YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
Set<String> configurePoliciesSet = new HashSet<>();
for (String s : configuredPolicies) {
configurePoliciesSet.add(s);
}
assertTrue(smm.isSameConfiguredPolicies(configurePoliciesSet));
// disable RM scheduler monitor
conf.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
cs.reinitialize(conf, rm.getRMContext());
assertTrue(smm.isRSMEmpty());
}
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@ -48,7 +49,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
@ -799,14 +799,16 @@ public class TestProportionalCapacityPreemptionPolicy {
// 2) check if ResourceCalculator in policy is null or not. // 2) check if ResourceCalculator in policy is null or not.
// If it's not null, we can come to a conclusion that policy initialized // If it's not null, we can come to a conclusion that policy initialized
// after scheduler got initialized // after scheduler got initialized
for (Service service : rm.getRMActiveService().getServices()) { // Get SchedulingMonitor from SchedulingMonitorManager instead
if (service instanceof SchedulingMonitor) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ProportionalCapacityPreemptionPolicy policy = SchedulingMonitorManager smm = cs.getSchedulingMonitorManager();
(ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service) Service service = smm.getAvailableSchedulingMonitor();
.getSchedulingEditPolicy(); if (service instanceof SchedulingMonitor) {
assertNotNull(policy.getResourceCalculator()); ProportionalCapacityPreemptionPolicy policy =
return; (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
} .getSchedulingEditPolicy();
assertNotNull(policy.getResourceCalculator());
return;
} }
fail("Failed to find SchedulingMonitor service, please check what happened"); fail("Failed to find SchedulingMonitor service, please check what happened");

View File

@ -26,6 +26,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -126,7 +128,11 @@ public class TestCapacitySchedulerLazyPreemption
Resources.createResource(1 * GB), 1)), null); Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update // Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if one container from app1 marked // Call edit schedule twice, and check if one container from app1 marked
// to be "killable" // to be "killable"
@ -209,7 +215,11 @@ public class TestCapacitySchedulerLazyPreemption
Resources.createResource(1 * GB), 1)), null); Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update // Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if one container from app1 marked // Call edit schedule twice, and check if one container from app1 marked
// to be "killable" // to be "killable"
@ -301,7 +311,11 @@ public class TestCapacitySchedulerLazyPreemption
Resources.createResource(1 * GB), 1, false)), null); Resources.createResource(1 * GB), 1, false)), null);
// Get edit policy and do one update // Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if one container from app1 marked // Call edit schedule twice, and check if one container from app1 marked
// to be "killable" // to be "killable"
@ -387,8 +401,11 @@ public class TestCapacitySchedulerLazyPreemption
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>()); am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update // Get edit policy and do one update
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if one container from app1 marked // Call edit schedule twice, and check if one container from app1 marked
// to be "killable" // to be "killable"
@ -487,8 +504,11 @@ public class TestCapacitySchedulerLazyPreemption
am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>()); am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
// Get edit policy and do one update // Get edit policy and do one update
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if 3 container from app1 marked // Call edit schedule twice, and check if 3 container from app1 marked
// to be "killable" // to be "killable"
@ -582,7 +602,11 @@ public class TestCapacitySchedulerLazyPreemption
Resources.createResource(1 * GB), 1)), null); Resources.createResource(1 * GB), 1)), null);
// Get edit policy and do one update // Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if no container from app1 marked // Call edit schedule twice, and check if no container from app1 marked
// to be "killable" // to be "killable"

View File

@ -26,7 +26,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -138,7 +139,11 @@ public class TestCapacitySchedulerSurgicalPreemption
Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer()); Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
// Get edit policy and do one update // Get edit policy and do one update
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
// Call edit schedule twice, and check if 4 containers from app1 at n1 killed // Call edit schedule twice, and check if 4 containers from app1 at n1 killed
editPolicy.editSchedule(); editPolicy.editSchedule();
@ -217,8 +222,11 @@ public class TestCapacitySchedulerSurgicalPreemption
ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
// Call editSchedule: containers are selected to be preemption candidate // Call editSchedule: containers are selected to be preemption candidate
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
Assert.assertEquals(3, editPolicy.getToPreemptContainers().size()); Assert.assertEquals(3, editPolicy.getToPreemptContainers().size());
@ -323,8 +331,11 @@ public class TestCapacitySchedulerSurgicalPreemption
} }
// Call editSchedule immediately: containers are not selected // Call editSchedule immediately: containers are not selected
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
Assert.assertEquals(0, editPolicy.getToPreemptContainers().size()); Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
@ -434,8 +445,11 @@ public class TestCapacitySchedulerSurgicalPreemption
cs.getNode(rmNode3.getNodeID()).getReservedContainer()); cs.getNode(rmNode3.getNodeID()).getReservedContainer());
// Call editSchedule immediately: nothing happens // Call editSchedule immediately: nothing happens
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
Assert.assertNotNull( Assert.assertNotNull(
cs.getNode(rmNode3.getNodeID()).getReservedContainer()); cs.getNode(rmNode3.getNodeID()).getReservedContainer());
@ -562,8 +576,11 @@ public class TestCapacitySchedulerSurgicalPreemption
// 6 (selected) + 1 (allocated) which makes target capacity to 70% // 6 (selected) + 1 (allocated) which makes target capacity to 70%
Thread.sleep(1000); Thread.sleep(1000);
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
checkNumberOfPreemptionCandidateFromApp(editPolicy, 6, checkNumberOfPreemptionCandidateFromApp(editPolicy, 6,
am1.getApplicationAttemptId()); am1.getApplicationAttemptId());
@ -715,8 +732,11 @@ public class TestCapacitySchedulerSurgicalPreemption
Thread.sleep(1000); Thread.sleep(1000);
/* 1st container preempted is on n2 */ /* 1st container preempted is on n2 */
SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy = ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1); (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
// We should have one to-preempt container, on node[2] // We should have one to-preempt container, on node[2]
@ -887,7 +907,11 @@ public class TestCapacitySchedulerSurgicalPreemption
waitNumberOfReservedContainersFromApp(schedulerApp2, 1); waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
// Call editSchedule twice and allocation once, container should get allocated // Call editSchedule twice and allocation once, container should get allocated
SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1); SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
getResourceScheduler()).getSchedulingMonitorManager();
SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
ProportionalCapacityPreemptionPolicy editPolicy =
(ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
editPolicy.editSchedule(); editPolicy.editSchedule();
editPolicy.editSchedule(); editPolicy.editSchedule();