diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b133db883ad..6cc09c3e32c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -212,6 +212,9 @@ Release 2.5.0 - UNRELEASED YARN-2128. FairScheduler: Incorrect calculation of amResource usage. (Wei Yan via kasha) + YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. (Wangda Tan + via jianhe) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 724dee14f31..77de2090a00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -327,7 +327,7 @@ public class ResourceManager extends CompositeService implements Recoverable { * RMActiveServices handles all the Active services in the RM. */ @Private - class RMActiveServices extends CompositeService { + public class RMActiveServices extends CompositeService { private DelegationTokenRenewer delegationTokenRenewer; private EventHandler schedulerDispatcher; @@ -526,11 +526,9 @@ public class ResourceManager extends CompositeService implements Recoverable { (PreemptableResourceScheduler) scheduler)); for (SchedulingEditPolicy policy : policies) { LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName()); - policy.init(conf, rmContext.getDispatcher().getEventHandler(), - (PreemptableResourceScheduler) scheduler); // periodically check whether we need to take action to guarantee // constraints - SchedulingMonitor mon = new SchedulingMonitor(policy); + SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy); addService(mon); } } else { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java index 2e93a9e51c2..1682f7d8612 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java @@ -21,6 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import com.google.common.annotations.VisibleForTesting; @@ -34,18 +36,29 @@ public class SchedulingMonitor extends AbstractService { private Thread checkerThread; private volatile boolean stopped; private long monitorInterval; + private RMContext rmContext; - public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) { + public SchedulingMonitor(RMContext rmContext, + SchedulingEditPolicy scheduleEditPolicy) { super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")"); this.scheduleEditPolicy = scheduleEditPolicy; - this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); + this.rmContext = rmContext; } public long getMonitorInterval() { return monitorInterval; } + + @VisibleForTesting + public synchronized SchedulingEditPolicy getSchedulingEditPolicy() { + return scheduleEditPolicy; + } + @SuppressWarnings("unchecked") public void serviceInit(Configuration conf) throws Exception { + scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(), + (PreemptableResourceScheduler) rmContext.getScheduler()); + this.monitorInterval = scheduleEditPolicy.getMonitoringInterval(); super.serviceInit(conf); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java index f94aedbf921..6d1516158b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java @@ -165,6 +165,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic observeOnly = config.getBoolean(OBSERVE_ONLY, false); rc = scheduler.getResourceCalculator(); } + + @VisibleForTesting + public ResourceCalculator getResourceCalculator() { + return rc; + } @Override public void editSchedule(){ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 446bbae04a8..67eac76e5f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -571,4 +571,8 @@ public class MockRM extends ResourceManager { .getSchedulerApplications().get(app.getApplicationId()).getQueue() .getMetrics().clearQueueMetrics(); } + + public RMActiveServices getRMActiveService() { + return activeServices; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 6a449f5c03a..d0a80eb20bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -17,6 +17,25 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; +import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; @@ -27,12 +46,16 @@ import java.util.Random; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent; @@ -52,17 +75,6 @@ import org.junit.rules.TestName; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatcher; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; -import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - public class TestProportionalCapacityPreemptionPolicy { static final long TS = 3141592653L; @@ -424,6 +436,36 @@ public class TestProportionalCapacityPreemptionPolicy { assert containers.get(4).equals(rm5); } + + @Test + public void testPolicyInitializeAfterSchedulerInitialized() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, + ProportionalCapacityPreemptionPolicy.class.getCanonicalName()); + conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true); + + @SuppressWarnings("resource") + MockRM rm = new MockRM(conf); + rm.init(conf); + + // ProportionalCapacityPreemptionPolicy should be initialized after + // CapacityScheduler initialized. We will + // 1) find SchedulingMonitor from RMActiveService's service list, + // 2) check if ResourceCalculator in policy is null or not. + // If it's not null, we can come to a conclusion that policy initialized + // after scheduler got initialized + for (Service service : rm.getRMActiveService().getServices()) { + if (service instanceof SchedulingMonitor) { + ProportionalCapacityPreemptionPolicy policy = + (ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service) + .getSchedulingEditPolicy(); + assertNotNull(policy.getResourceCalculator()); + return; + } + } + + fail("Failed to find SchedulingMonitor service, please check what happened"); + } static class IsPreemptionRequestFor extends ArgumentMatcher {