diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java index 43930cdf28c..664022f667f 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/ResourceSchedulerWrapper.java @@ -61,10 +61,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -89,11 +92,13 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SlidingWindowReservoir; import com.codahale.metrics.Timer; -public class ResourceSchedulerWrapper implements ResourceScheduler, - Configurable { +public class ResourceSchedulerWrapper + extends AbstractYarnScheduler + implements ResourceScheduler, Configurable { private static final String EOL = System.getProperty("line.separator"); private static final int SAMPLING_SIZE = 60; private ScheduledExecutorService pool; + private RMContext rmContext; // counters for scheduler allocate/handle operations private Counter schedulerAllocateCounter; private Counter schedulerHandleCounter; @@ -146,6 +151,7 @@ public class ResourceSchedulerWrapper implements ResourceScheduler, public final Logger LOG = Logger.getLogger(ResourceSchedulerWrapper.class); public ResourceSchedulerWrapper() { + super(ResourceSchedulerWrapper.class.getName()); samplerLock = new ReentrantLock(); queueLock = new ReentrantLock(); } @@ -795,10 +801,39 @@ public class ResourceSchedulerWrapper implements ResourceScheduler, return conf; } + @SuppressWarnings("unchecked") @Override - public void reinitialize(Configuration entries, RMContext rmContext) - throws IOException { - scheduler.reinitialize(entries, rmContext); + public void serviceInit(Configuration conf) throws Exception { + ((AbstractYarnScheduler) + scheduler).init(conf); + super.serviceInit(conf); + } + + @SuppressWarnings("unchecked") + @Override + public void serviceStart() throws Exception { + ((AbstractYarnScheduler) + scheduler).start(); + super.serviceStart(); + } + + @SuppressWarnings("unchecked") + @Override + public void serviceStop() throws Exception { + ((AbstractYarnScheduler) + scheduler).stop(); + super.serviceStop(); + } + + @Override + public void setRMContext(RMContext rmContext) { + scheduler.setRMContext(rmContext); + } + + @Override + public void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + scheduler.reinitialize(conf, rmContext); } @Override diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 308d45760da..5753f9fe503 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -115,6 +115,8 @@ Release 2.5.0 - UNRELEASED NMContainerStatus which has more information that is needed for work-preserving RM-restart. (Jian He via vinodkv) + YARN-1474. Make sechedulers services. (Tsuyoshi Ozawa via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b914b1f723b..b62bd5f2b68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -401,6 +401,8 @@ public class ResourceManager extends CompositeService implements Recoverable { // Initialize the scheduler scheduler = createScheduler(); + scheduler.setRMContext(rmContext); + addIfService(scheduler); rmContext.setScheduler(scheduler); schedulerDispatcher = createSchedulerEventDispatcher(); @@ -429,12 +431,6 @@ public class ResourceManager extends CompositeService implements Recoverable { DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); - try { - scheduler.reinitialize(conf, rmContext); - } catch (IOException ioe) { - throw new RuntimeException("Failed to initialize scheduler", ioe); - } - // creating monitors that handle preemption createPolicyMonitors(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 642cd313213..cc1cb478474 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport; @@ -41,7 +42,7 @@ import org.apache.hadoop.yarn.util.resource.Resources; public abstract class AbstractYarnScheduler - implements ResourceScheduler { + extends AbstractService implements ResourceScheduler { private static final Log LOG = LogFactory.getLog(AbstractYarnScheduler.class); @@ -62,6 +63,15 @@ public abstract class AbstractYarnScheduler protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + /** + * Construct the service. + * + * @param name service name + */ + public AbstractYarnScheduler(String name) { + super(name); + } + public synchronized List getTransferredContainers( ApplicationAttemptId currentAttempt) { ApplicationId appId = currentAttempt.getApplicationId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index 88408810ce6..5649ccf7dca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -34,6 +34,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @LimitedPrivate("yarn") @Evolving public interface ResourceScheduler extends YarnScheduler, Recoverable { + + /** + * Set RMContext for ResourceScheduler. + * This method should be called immediately after instantiating + * a scheduler once. + * @param rmContext created by ResourceManager + */ + void setRMContext(RMContext rmContext); + /** * Re-initialize the ResourceScheduler. * @param conf configuration diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 9eed61fb281..77674456534 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import com.google.common.base.Preconditions; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -102,6 +103,8 @@ public class CapacityScheduler extends private static final Log LOG = LogFactory.getLog(CapacityScheduler.class); private CSQueue root; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; static final Comparator queueComparator = new Comparator() { @Override @@ -179,8 +182,6 @@ public class CapacityScheduler extends private int numNodeManagers = 0; - private boolean initialized = false; - private ResourceCalculator calculator; private boolean usePortForNodeName; @@ -196,7 +197,9 @@ public class CapacityScheduler extends + ".scheduling-interval-ms"; private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; - public CapacityScheduler() {} + public CapacityScheduler() { + super(CapacityScheduler.class.getName()); + } @Override public QueueMetrics getRootQueueMetrics() { @@ -238,55 +241,90 @@ public class CapacityScheduler extends } @Override - public RMContext getRMContext() { + public synchronized RMContext getRMContext() { return this.rmContext; } - + + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + private synchronized void initScheduler(Configuration configuration) throws + IOException { + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + this.minimumAllocation = this.conf.getMinimumAllocation(); + this.maximumAllocation = this.conf.getMaximumAllocation(); + this.calculator = this.conf.getResourceCalculator(); + this.usePortForNodeName = this.conf.getUsePortForNodeName(); + this.applications = + new ConcurrentHashMap>(); + + initializeQueues(this.conf); + + scheduleAsynchronously = this.conf.getScheduleAynschronously(); + asyncScheduleInterval = + this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, + DEFAULT_ASYNC_SCHEDULER_INTERVAL); + if (scheduleAsynchronously) { + asyncSchedulerThread = new AsyncScheduleThread(this); + } + + LOG.info("Initialized CapacityScheduler with " + + "calculator=" + getResourceCalculator().getClass() + ", " + + "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + + "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + + "asynchronousScheduling=" + scheduleAsynchronously + ", " + + "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); + } + + private synchronized void startSchedulerThreads() { + if (scheduleAsynchronously) { + Preconditions.checkNotNull(asyncSchedulerThread, + "asyncSchedulerThread is null"); + asyncSchedulerThread.start(); + } + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + Configuration configuration = new Configuration(conf); + initScheduler(configuration); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + synchronized (this) { + if (scheduleAsynchronously && asyncSchedulerThread != null) { + asyncSchedulerThread.interrupt(); + asyncSchedulerThread.join(THREAD_JOIN_TIMEOUT_MS); + } + } + super.serviceStop(); + } + @Override public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { + reinitialize(Configuration conf, RMContext rmContext) throws IOException { Configuration configuration = new Configuration(conf); - if (!initialized) { - this.rmContext = rmContext; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - this.minimumAllocation = this.conf.getMinimumAllocation(); - this.maximumAllocation = this.conf.getMaximumAllocation(); - this.calculator = this.conf.getResourceCalculator(); - this.usePortForNodeName = this.conf.getUsePortForNodeName(); - this.applications = - new ConcurrentHashMap>(); - - initializeQueues(this.conf); - - scheduleAsynchronously = this.conf.getScheduleAynschronously(); - asyncScheduleInterval = - this.conf.getLong(ASYNC_SCHEDULER_INTERVAL, - DEFAULT_ASYNC_SCHEDULER_INTERVAL); - if (scheduleAsynchronously) { - asyncSchedulerThread = new AsyncScheduleThread(this); - asyncSchedulerThread.start(); - } - - initialized = true; - LOG.info("Initialized CapacityScheduler with " + - "calculator=" + getResourceCalculator().getClass() + ", " + - "minimumAllocation=<" + getMinimumResourceCapability() + ">, " + - "maximumAllocation=<" + getMaximumResourceCapability() + ">, " + - "asynchronousScheduling=" + scheduleAsynchronously + ", " + - "asyncScheduleInterval=" + asyncScheduleInterval + "ms"); - - } else { - CapacitySchedulerConfiguration oldConf = this.conf; - this.conf = loadCapacitySchedulerConfiguration(configuration); - validateConf(this.conf); - try { - LOG.info("Re-initializing queues..."); - reinitializeQueues(this.conf); - } catch (Throwable t) { - this.conf = oldConf; - throw new IOException("Failed to re-init queues", t); - } + CapacitySchedulerConfiguration oldConf = this.conf; + this.conf = loadCapacitySchedulerConfiguration(configuration); + validateConf(this.conf); + try { + LOG.info("Re-initializing queues..."); + reinitializeQueues(this.conf); + } catch (Throwable t) { + this.conf = oldConf; + throw new IOException("Failed to re-init queues", t); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index bedbb64cd2f..6c356308918 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -68,7 +68,9 @@ public class AllocationFileLoaderService extends AbstractService { * (this is done to prevent loading a file that hasn't been fully written). */ public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000; - + + public static final long THREAD_JOIN_TIMEOUT_MS = 1000; + private final Clock clock; private long lastSuccessfulReload; // Last time we successfully reloaded queues @@ -146,7 +148,14 @@ public class AllocationFileLoaderService extends AbstractService { @Override public void stop() { running = false; - reloadThread.interrupt(); + if (reloadThread != null) { + reloadThread.interrupt(); + try { + reloadThread.join(THREAD_JOIN_TIMEOUT_MS); + } catch (InterruptedException e) { + LOG.warn("reloadThread fails to join."); + } + } super.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 6d71ea2fbb3..1b7011a88ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -117,7 +118,6 @@ import com.google.common.annotations.VisibleForTesting; @SuppressWarnings("unchecked") public class FairScheduler extends AbstractYarnScheduler { - private boolean initialized; private FairSchedulerConfiguration conf; private Resource incrAllocation; @@ -137,6 +137,11 @@ public class FairScheduler extends // How often fair shares are re-calculated (ms) protected long UPDATE_INTERVAL = 500; + private Thread updateThread; + private Thread schedulingThread; + // timeout to join when we stop this service + protected final long THREAD_JOIN_TIMEOUT_MS = 1000; + // Aggregate metrics FSQueueMetrics rootMetrics; @@ -182,6 +187,7 @@ public class FairScheduler extends AllocationConfiguration allocConf; public FairScheduler() { + super(FairScheduler.class.getName()); clock = new SystemClock(); allocsLoader = new AllocationFileLoaderService(); queueMgr = new QueueManager(this); @@ -473,7 +479,8 @@ public class FairScheduler extends return resToPreempt; } - public RMContainerTokenSecretManager getContainerTokenSecretManager() { + public synchronized RMContainerTokenSecretManager + getContainerTokenSecretManager() { return rmContext.getContainerTokenSecretManager(); } @@ -1154,87 +1161,130 @@ public class FairScheduler extends // NOT IMPLEMENTED } - @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + + private synchronized void initScheduler(Configuration conf) throws IOException { - if (!initialized) { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - preemptionUtilizationThreshold = - this.conf.getPreemptionUtilizationThreshold(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - this.rmContext = rmContext; - // This stores per-application scheduling information - this.applications = - new ConcurrentHashMap>(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + preemptionUtilizationThreshold = + this.conf.getPreemptionUtilizationThreshold(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); - initialized = true; + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + // This stores per-application scheduling information + this.applications = + new ConcurrentHashMap>(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } - Thread updateThread = new Thread(new UpdateThread()); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); - updateThread.start(); + updateThread = new Thread(new UpdateThread()); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - Thread schedulingThread = new Thread( + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + schedulingThread = new Thread( new Runnable() { @Override public void run() { continuousScheduling(); } } - ); - schedulingThread.setName("ContinuousScheduling"); - schedulingThread.setDaemon(true); - schedulingThread.start(); + ); + schedulingThread.setName("ContinuousScheduling"); + schedulingThread.setDaemon(true); + } + + allocsLoader.init(conf); + allocsLoader.setReloadListener(new AllocationReloadListener()); + // If we fail to load allocations file on initialize, we want to fail + // immediately. After a successful load, exceptions on future reloads + // will just result in leaving things as they are. + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + throw new IOException("Failed to initialize FairScheduler", e); + } + } + + private synchronized void startSchedulerThreads() { + Preconditions.checkNotNull(updateThread, "updateThread is null"); + Preconditions.checkNotNull(allocsLoader, "allocsLoader is null"); + updateThread.start(); + if (continuousSchedulingEnabled) { + Preconditions.checkNotNull(schedulingThread, "schedulingThread is null"); + schedulingThread.start(); + } + allocsLoader.start(); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + startSchedulerThreads(); + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + synchronized (this) { + if (updateThread != null) { + updateThread.interrupt(); + updateThread.join(THREAD_JOIN_TIMEOUT_MS); } - - allocsLoader.init(conf); - allocsLoader.setReloadListener(new AllocationReloadListener()); - // If we fail to load allocations file on initialize, we want to fail - // immediately. After a successful load, exceptions on future reloads - // will just result in leaving things as they are. - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - throw new IOException("Failed to initialize FairScheduler", e); + if (continuousSchedulingEnabled) { + if (schedulingThread != null) { + schedulingThread.interrupt(); + schedulingThread.join(THREAD_JOIN_TIMEOUT_MS); + } } - allocsLoader.start(); - } else { - try { - allocsLoader.reloadAllocations(); - } catch (Exception e) { - LOG.error("Failed to reload allocations file", e); + if (allocsLoader != null) { + allocsLoader.stop(); } } + + super.serviceStop(); + } + + @Override + public synchronized void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { + try { + allocsLoader.reloadAllocations(); + } catch (Exception e) { + LOG.error("Failed to reload allocations file", e); + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index bc3441ba515..d461615047f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -89,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; + import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -111,7 +112,6 @@ public class FifoScheduler extends Configuration conf; - private boolean initialized; private boolean usePortForNodeName; private ActiveUsersManager activeUsersManager; @@ -180,6 +180,47 @@ public class FifoScheduler extends } }; + public FifoScheduler() { + super(FifoScheduler.class.getName()); + } + + private synchronized void initScheduler(Configuration conf) { + validateConf(conf); + //Use ConcurrentSkipListMap because applications need to be ordered + this.applications = + new ConcurrentSkipListMap>(); + this.minimumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); + this.maximumAllocation = + Resources.createResource(conf.getInt( + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); + this.usePortForNodeName = conf.getBoolean( + YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, + YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); + this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, + conf); + this.activeUsersManager = new ActiveUsersManager(metrics); + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + initScheduler(conf); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + public void serviceStop() throws Exception { + super.serviceStop(); + } + @Override public synchronized void setConf(Configuration conf) { this.conf = conf; @@ -215,36 +256,18 @@ public class FifoScheduler extends return nodes.size(); } + @Override + public synchronized void setRMContext(RMContext rmContext) { + this.rmContext = rmContext; + } + @Override public synchronized void reinitialize(Configuration conf, RMContext rmContext) throws IOException { setConf(conf); - if (!this.initialized) { - validateConf(conf); - this.rmContext = rmContext; - //Use ConcurrentSkipListMap because applications need to be ordered - this.applications = - new ConcurrentSkipListMap>(); - this.minimumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); - this.maximumAllocation = - Resources.createResource(conf.getInt( - YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, - YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); - this.usePortForNodeName = conf.getBoolean( - YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, - YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME); - this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false, - conf); - this.activeUsersManager = new ActiveUsersManager(metrics); - this.initialized = true; - } } - @Override public Allocation allocate( ApplicationAttemptId applicationAttemptId, List ask, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java index fcd5041e425..f50ae9d5e3b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestFifoScheduler.java @@ -77,12 +77,12 @@ public class TestFifoScheduler { @Test (timeout = 30000) public void testConfValidation() throws Exception { - ResourceScheduler scheduler = new FifoScheduler(); + FifoScheduler scheduler = new FifoScheduler(); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -218,6 +218,9 @@ public class TestFifoScheduler { public void testNodeUpdateBeforeAppAttemptInit() throws Exception { FifoScheduler scheduler = new FifoScheduler(); MockRM rm = new MockRM(conf); + scheduler.setRMContext(rm.getRMContext()); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, rm.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, @@ -293,6 +296,8 @@ public class TestFifoScheduler { conf.setQueues("default", new String[] {"default"}); conf.setCapacity("default", 100); FifoScheduler fs = new FifoScheduler(); + fs.init(conf); + fs.start(); fs.reinitialize(conf, null); RMNode n1 = @@ -313,6 +318,7 @@ public class TestFifoScheduler { fs.handle(new NodeUpdateSchedulerEvent(n1)); Assert.assertEquals(4 * GB, fs.getRootQueueMetrics().getAvailableMB()); + fs.stop(); } @Test (timeout = 50000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 14120394ca5..6322df3ffe9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -121,13 +121,16 @@ public class TestCapacityScheduler { @After public void tearDown() throws Exception { - resourceManager.stop(); + if (resourceManager != null) { + resourceManager.stop(); + } } @Test (timeout = 30000) public void testConfValidation() throws Exception { ResourceScheduler scheduler = new CapacityScheduler(); + scheduler.setRMContext(resourceManager.getRMContext()); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); @@ -342,18 +345,23 @@ public class TestCapacityScheduler { public void testRefreshQueues() throws Exception { CapacityScheduler cs = new CapacityScheduler(); CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rmContext); checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); cs.reinitialize(conf, mockContext); checkQueueCapacities(cs, 80f, 20f); + cs.stop(); } private void checkQueueCapacities(CapacityScheduler cs, @@ -456,6 +464,9 @@ public class TestCapacityScheduler { setupQueueConfiguration(csConf); CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(csConf); + cs.start(); cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(csConf), new NMTokenSecretManagerInRM(csConf), @@ -475,6 +486,7 @@ public class TestCapacityScheduler { cs.handle(new NodeAddedSchedulerEvent(n1)); Assert.assertEquals(4 * GB, cs.getClusterResource().getMemory()); + cs.stop(); } @Test @@ -483,6 +495,9 @@ public class TestCapacityScheduler { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.setConf(new YarnConfiguration()); + cs.setRMContext(resourceManager.getRMContext()); + cs.init(conf); + cs.start(); cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), @@ -513,6 +528,7 @@ public class TestCapacityScheduler { assertEquals(queueB, queueB4.getParent()); } finally { B3_CAPACITY += B4_CAPACITY; + cs.stop(); } } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 2a26d30bfd5..690fa7421b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -114,7 +114,7 @@ public class TestLeafQueue { setupQueueConfiguration(csConf, newRoot); YarnConfiguration conf = new YarnConfiguration(); cs.setConf(conf); - + csContext = mock(CapacitySchedulerContext.class); when(csContext.getConfiguration()).thenReturn(csConf); when(csContext.getConf()).thenReturn(conf); @@ -142,7 +142,9 @@ public class TestLeafQueue { queues, queues, TestUtils.spyHook); - cs.reinitialize(csConf, rmContext); + cs.setRMContext(rmContext); + cs.init(csConf); + cs.start(); } private static final String A = "a"; @@ -2080,5 +2082,8 @@ public class TestLeafQueue { @After public void tearDown() throws Exception { + if (cs != null) { + cs.stop(); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index f0e2c04a5f0..a3b990c6a33 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -43,11 +43,15 @@ public class TestQueueParsing { YarnConfiguration conf = new YarnConfiguration(csConf); CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = new RMContextImpl(null, null, + null, null, null, null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null); capacityScheduler.setConf(conf); - capacityScheduler.reinitialize(conf, new RMContextImpl(null, null, - null, null, null, null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(conf); + capacityScheduler.start(); + capacityScheduler.reinitialize(conf, rmContext); CSQueue a = capacityScheduler.getQueue("a"); Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA); @@ -62,6 +66,7 @@ public class TestQueueParsing { Assert.assertEquals(0.7 * 0.5 * 0.45, c12.getAbsoluteCapacity(), DELTA); Assert.assertEquals(0.7 * 0.55 * 0.7, c12.getAbsoluteMaximumCapacity(), DELTA); + capacityScheduler.stop(); } private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { @@ -142,7 +147,10 @@ public class TestQueueParsing { CapacityScheduler capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); + capacityScheduler.init(conf); + capacityScheduler.start(); capacityScheduler.reinitialize(conf, null); + capacityScheduler.stop(); } public void testMaxCapacity() throws Exception { @@ -164,6 +172,8 @@ public class TestQueueParsing { try { capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); + capacityScheduler.init(conf); + capacityScheduler.start(); capacityScheduler.reinitialize(conf, null); } catch (IllegalArgumentException iae) { fail = true; @@ -176,6 +186,8 @@ public class TestQueueParsing { // Now this should work capacityScheduler = new CapacityScheduler(); capacityScheduler.setConf(new YarnConfiguration()); + capacityScheduler.init(conf); + capacityScheduler.start(); capacityScheduler.reinitialize(conf, null); fail = false; @@ -187,6 +199,7 @@ public class TestQueueParsing { } Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " + "setMaxCap", fail); + capacityScheduler.stop(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index e4dc8016c12..4c07ee885f6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -48,6 +48,8 @@ public class TestFSLeafQueue { ResourceManager resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); String queueName = "root.queue1"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 2de498f4f41..b9f40b3854b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -120,6 +120,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { // to initialize the master key resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); + + scheduler.setRMContext(resourceManager.getRMContext()); } @After @@ -133,12 +135,12 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 30000) public void testConfValidation() throws Exception { - ResourceScheduler scheduler = new FairScheduler(); + FairScheduler scheduler = new FairScheduler(); Configuration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 2048); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min memory allocation is" + " larger than the max memory allocation."); } catch (YarnRuntimeException e) { @@ -152,7 +154,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 2); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 1); try { - scheduler.reinitialize(conf, null); + scheduler.serviceInit(conf); fail("Exception is expected because the min vcores allocation is" + " larger than the max vcores allocation."); } catch (YarnRuntimeException e) { @@ -184,6 +186,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 128); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(3, scheduler.maxAssign); @@ -211,6 +215,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); + fs.init(conf); fs.reinitialize(conf, null); Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores()); @@ -228,8 +233,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512); conf.setInt( FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2); - fs.reinitialize(conf, null); - Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); + fs.init(conf); + fs.reinitialize(conf, null); + Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory()); Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores()); Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory()); Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores()); @@ -237,6 +243,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testAggregateCapacityTracking() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -262,6 +270,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testSimpleFairShareCalculation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -289,6 +299,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testSimpleHierarchicalFairShareCalculation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -322,6 +334,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testHierarchicalQueuesSimilarParents() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -346,6 +360,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testSchedulerRootQueueMetrics() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -385,6 +401,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testSimpleContainerAllocation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -433,6 +451,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testSimpleContainerReservation() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -487,6 +507,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); @@ -513,6 +535,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testNotUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMContext rmContext = resourceManager.getRMContext(); Map appsMap = rmContext.getRMApps(); @@ -539,6 +563,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testEmptyQueueName() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // only default queue @@ -559,8 +585,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testAssignToQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); @@ -577,6 +605,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testAssignToNonLeafQueueReturnsNull() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.getQueueManager().getLeafQueue("root.child1.granchild", true); @@ -594,6 +624,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { public void testQueuePlacementWithPolicy() throws Exception { conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appId; @@ -654,7 +686,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -703,6 +737,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); @@ -735,6 +771,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); int capacity = 16 * 1024; @@ -769,6 +807,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { */ @Test public void testQueueDemandCalculation() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId id11 = createAppAttemptId(1, 1); @@ -819,6 +859,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testAppAdditionAndRemoval() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId attemptId =createAppAttemptId(1, 1); AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(attemptId.getApplicationId(), "default", @@ -869,6 +911,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -901,7 +945,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -928,6 +974,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add one big node (only care about aggregate capacity) @@ -985,8 +1033,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // Add one big node (only care about aggregate capacity) RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, @@ -1057,7 +1107,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Create two nodes @@ -1224,6 +1276,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Create four nodes @@ -1321,6 +1375,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -1363,8 +1419,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // Add a node RMNode node1 = MockNodes @@ -1404,6 +1462,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); // Add a node @@ -1484,8 +1544,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "norealuserhasthisname", 1); ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", @@ -1499,6 +1561,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testMultipleNodesSingleRackRequest() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1548,6 +1612,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) public void testFifoWithinQueue() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1592,6 +1658,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test(timeout = 3000) public void testMaxAssign() throws Exception { conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = @@ -1635,6 +1703,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { */ @Test(timeout = 5000) public void testAssignContainer() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); final String user = "user1"; @@ -1718,9 +1788,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + int appId = this.APP_ID++; String user = "usernotallow"; String queue = "queue1"; @@ -1769,6 +1841,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testReservationThatDoesntFit() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = @@ -1797,6 +1871,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testRemoveNodeUpdatesRootQueueMetrics() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); @@ -1825,6 +1901,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1865,6 +1943,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testCancelStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1915,6 +1995,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { */ @Test public void testReservationsStrictLocality() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1"); @@ -1955,6 +2037,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testNoMoreCpuOnNode() throws IOException { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 1), @@ -1976,6 +2060,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testBasicDRFAssignment() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 5)); @@ -2016,6 +2102,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { */ @Test public void testBasicDRFWithQueues() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(8192, 7), @@ -2052,6 +2140,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testDRFHierarchicalQueues() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node = MockNodes.newNodeInfo(1, BuilderUtils.newResource(12288, 12), @@ -2120,7 +2210,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { public void testHostPortNodeName() throws Exception { conf.setBoolean(YarnConfiguration .RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true); - scheduler.reinitialize(conf, + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(1024), 1, "127.0.0.1", 1); @@ -2200,9 +2292,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1"); verifyAppRunnable(attId1, true); @@ -2254,9 +2348,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(""); out.close(); - + + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + // exceeds no limits ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1"); verifyAppRunnable(attId1, true); @@ -2316,6 +2412,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { Configuration conf = createConfiguration(); conf.setBoolean(FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true); + fs.setRMContext(resourceManager.getRMContext()); + fs.init(conf); + fs.start(); fs.reinitialize(conf, resourceManager.getRMContext()); Assert.assertTrue("Continuous scheduling should be enabled.", fs.isContinuousSchedulingEnabled()); @@ -2396,6 +2495,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); QueueManager queueManager = scheduler.getQueueManager(); @@ -2439,6 +2540,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.close(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); List rules = scheduler.allocConf.placementPolicy @@ -2455,6 +2558,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @SuppressWarnings("resource") @Test public void testBlacklistNodes() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); final int GB = 1024; @@ -2507,6 +2612,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testGetAppsInQueue() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); ApplicationAttemptId appAttId1 = @@ -2552,8 +2659,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testMoveRunnableApp() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2591,8 +2700,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testMoveNonRunnableApp() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2611,8 +2722,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testMoveMakesAppRunnable() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); FSLeafQueue targetQueue = queueMgr.getLeafQueue("queue2", true); @@ -2638,8 +2751,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (expected = YarnException.class) public void testMoveWouldViolateMaxAppsConstraints() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + QueueManager queueMgr = scheduler.getQueueManager(); queueMgr.getLeafQueue("queue2", true); scheduler.getAllocationConfiguration().queueMaxApps.put("root.queue2", 0); @@ -2652,8 +2767,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (expected = YarnException.class) public void testMoveWouldViolateMaxResourcesConstraints() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); - + QueueManager queueMgr = scheduler.getQueueManager(); FSLeafQueue oldQueue = queueMgr.getLeafQueue("queue1", true); queueMgr.getLeafQueue("queue2", true); @@ -2675,6 +2792,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (expected = YarnException.class) public void testMoveToNonexistentQueue() throws Exception { + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.getQueueManager().getLeafQueue("queue1", true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java index 77a3b028e0b..311b53182df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerEventLog.java @@ -51,6 +51,8 @@ public class TestFairSchedulerEventLog { resourceManager = new ResourceManager(); resourceManager.init(conf); ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(conf, resourceManager.getRMContext()); } @@ -69,7 +71,13 @@ public class TestFairSchedulerEventLog { public void tearDown() { logFile.delete(); logFile.getParentFile().delete(); // fairscheduler/ - scheduler = null; - resourceManager = null; + if (scheduler != null) { + scheduler.stop(); + scheduler = null; + } + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index f5bfc371cfe..a0e22799290 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -145,9 +145,13 @@ public class TestFifoScheduler { RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null, null, null, null, null, writer); - FifoScheduler schedular = new FifoScheduler(); - schedular.reinitialize(new Configuration(), rmContext); - QueueMetrics metrics = schedular.getRootQueueMetrics(); + FifoScheduler scheduler = new FifoScheduler(); + Configuration conf = new Configuration(); + scheduler.setRMContext(rmContext); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rmContext); + QueueMetrics metrics = scheduler.getRootQueueMetrics(); int beforeAppsSubmitted = metrics.getAppsSubmitted(); ApplicationId appId = BuilderUtils.newApplicationId(200, 1); @@ -155,18 +159,19 @@ public class TestFifoScheduler { appId, 1); SchedulerEvent appEvent = new AppAddedSchedulerEvent(appId, "queue", "user"); - schedular.handle(appEvent); + scheduler.handle(appEvent); SchedulerEvent attemptEvent = new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent); + scheduler.handle(attemptEvent); appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 2); SchedulerEvent attemptEvent2 = new AppAttemptAddedSchedulerEvent(appAttemptId, false); - schedular.handle(attemptEvent2); + scheduler.handle(attemptEvent2); int afterAppsSubmitted = metrics.getAppsSubmitted(); Assert.assertEquals(1, afterAppsSubmitted - beforeAppsSubmitted); + scheduler.stop(); } @Test(timeout=2000) @@ -184,6 +189,9 @@ public class TestFifoScheduler { null, containerTokenSecretManager, nmTokenSecretManager, null, writer); FifoScheduler scheduler = new FifoScheduler(); + scheduler.setRMContext(rmContext); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(new Configuration(), rmContext); RMNode node0 = MockNodes.newNodeInfo(1, @@ -232,6 +240,7 @@ public class TestFifoScheduler { //Also check that the containers were scheduled SchedulerAppReport info = scheduler.getSchedulerAppInfo(appAttemptId); Assert.assertEquals(3, info.getLiveContainers().size()); + scheduler.stop(); } @Test(timeout=2000) @@ -254,6 +263,9 @@ public class TestFifoScheduler { return nodes; } }; + scheduler.setRMContext(rmContext); + scheduler.init(conf); + scheduler.start(); scheduler.reinitialize(new Configuration(), rmContext); RMNode node0 = MockNodes.newNodeInfo(1, Resources.createResource(2048, 4), 1, "127.0.0.1"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java index 2c2aae6f9fa..0df7c0dacb5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebApp.java @@ -203,10 +203,11 @@ public class TestRMWebApp { CapacityScheduler cs = new CapacityScheduler(); cs.setConf(new YarnConfiguration()); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, + cs.setRMContext(new RMContextImpl(null, null, null, null, null, null, new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM(), null)); + cs.init(conf); return cs; } @@ -269,19 +270,21 @@ public class TestRMWebApp { ResourceManager rm = mock(ResourceManager.class); RMContext rmContext = mockRMContext(apps, racks, nodes, mbsPerNode); - ResourceScheduler rs = mockFifoScheduler(); + ResourceScheduler rs = mockFifoScheduler(rmContext); when(rm.getResourceScheduler()).thenReturn(rs); when(rm.getRMContext()).thenReturn(rmContext); return rm; } - public static FifoScheduler mockFifoScheduler() throws Exception { + public static FifoScheduler mockFifoScheduler(RMContext rmContext) + throws Exception { CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupFifoQueueConfiguration(conf); FifoScheduler fs = new FifoScheduler(); fs.setConf(new YarnConfiguration()); - fs.reinitialize(conf, null); + fs.setRMContext(rmContext); + fs.init(conf); return fs; }