Merge r1601964 from trunk. YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. Contributed by Wangda Tan

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1601965 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-06-11 17:31:35 +00:00
parent 2582dd2ca3
commit e4f775755c
6 changed files with 82 additions and 17 deletions

View File

@ -212,6 +212,9 @@ Release 2.5.0 - UNRELEASED
YARN-2128. FairScheduler: Incorrect calculation of amResource usage.
(Wei Yan via kasha)
YARN-2124. Fixed NPE in ProportionalCapacityPreemptionPolicy. (Wangda Tan
via jianhe)
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -327,7 +327,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* RMActiveServices handles all the Active services in the RM.
*/
@Private
class RMActiveServices extends CompositeService {
public class RMActiveServices extends CompositeService {
private DelegationTokenRenewer delegationTokenRenewer;
private EventHandler<SchedulerEvent> schedulerDispatcher;
@ -526,11 +526,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
(PreemptableResourceScheduler) scheduler));
for (SchedulingEditPolicy policy : policies) {
LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
policy.init(conf, rmContext.getDispatcher().getEventHandler(),
(PreemptableResourceScheduler) scheduler);
// periodically check whether we need to take action to guarantee
// constraints
SchedulingMonitor mon = new SchedulingMonitor(policy);
SchedulingMonitor mon = new SchedulingMonitor(rmContext, policy);
addService(mon);
}
} else {

View File

@ -21,6 +21,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import com.google.common.annotations.VisibleForTesting;
@ -34,18 +36,29 @@ public class SchedulingMonitor extends AbstractService {
private Thread checkerThread;
private volatile boolean stopped;
private long monitorInterval;
private RMContext rmContext;
public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
public SchedulingMonitor(RMContext rmContext,
SchedulingEditPolicy scheduleEditPolicy) {
super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
this.scheduleEditPolicy = scheduleEditPolicy;
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
this.rmContext = rmContext;
}
public long getMonitorInterval() {
return monitorInterval;
}
@VisibleForTesting
public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
return scheduleEditPolicy;
}
@SuppressWarnings("unchecked")
public void serviceInit(Configuration conf) throws Exception {
scheduleEditPolicy.init(conf, rmContext.getDispatcher().getEventHandler(),
(PreemptableResourceScheduler) rmContext.getScheduler());
this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
super.serviceInit(conf);
}

View File

@ -165,6 +165,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
}
@VisibleForTesting
public ResourceCalculator getResourceCalculator() {
return rc;
}
@Override
public void editSchedule(){

View File

@ -571,4 +571,8 @@ public class MockRM extends ResourceManager {
.getSchedulerApplications().get(app.getApplicationId()).getQueue()
.getMetrics().clearQueueMetrics();
}
public RMActiveServices getRMActiveService() {
return activeServices;
}
}

View File

@ -17,6 +17,25 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
@ -27,12 +46,16 @@ import java.util.Random;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@ -52,17 +75,6 @@ import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
public class TestProportionalCapacityPreemptionPolicy {
static final long TS = 3141592653L;
@ -424,6 +436,36 @@ public class TestProportionalCapacityPreemptionPolicy {
assert containers.get(4).equals(rm5);
}
@Test
public void testPolicyInitializeAfterSchedulerInitialized() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
@SuppressWarnings("resource")
MockRM rm = new MockRM(conf);
rm.init(conf);
// ProportionalCapacityPreemptionPolicy should be initialized after
// CapacityScheduler initialized. We will
// 1) find SchedulingMonitor from RMActiveService's service list,
// 2) check if ResourceCalculator in policy is null or not.
// If it's not null, we can come to a conclusion that policy initialized
// after scheduler got initialized
for (Service service : rm.getRMActiveService().getServices()) {
if (service instanceof SchedulingMonitor) {
ProportionalCapacityPreemptionPolicy policy =
(ProportionalCapacityPreemptionPolicy) ((SchedulingMonitor) service)
.getSchedulingEditPolicy();
assertNotNull(policy.getResourceCalculator());
return;
}
}
fail("Failed to find SchedulingMonitor service, please check what happened");
}
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {