YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler. (Anubhav Dhoot via kasha)

(cherry picked from commit 0c4b112677)
This commit is contained in:
Karthik Kambatla 2015-01-06 04:41:45 +05:30
parent 0c2d996c2c
commit 7adffad2bb
19 changed files with 574 additions and 59 deletions

View File

@ -38,6 +38,8 @@ Release 2.7.0 - UNRELEASED
YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler.
(Anubhav Dhoot via kasha)
YARN-2881. [YARN-2574] Implement PlanFollower for FairScheduler.
(Anubhav Dhoot via kasha)
IMPROVEMENTS

View File

@ -204,6 +204,8 @@ public abstract class AbstractReservationSystem extends AbstractService
// currently only capacity scheduler is supported
if (scheduler instanceof CapacityScheduler) {
return CapacitySchedulerPlanFollower.class.getName();
} else if (scheduler instanceof FairScheduler) {
return FairSchedulerPlanFollower.class.getName();
}
return null;
}

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.Clock;
@ -99,7 +98,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower {
// create the default reservation queue if it doesnt exist
String defReservationId = getReservationIdFromQueueName(planQueueName) +
PlanQueue.DEFAULT_QUEUE_SUFFIX;
ReservationConstants.DEFAULT_QUEUE_SUFFIX;
String defReservationQueue = getReservationQueueName(planQueueName,
defReservationId);
createDefaultReservationQueue(planQueueName, planQueue,

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower {
private static final Logger LOG = LoggerFactory
.getLogger(FairSchedulerPlanFollower.class);
private FairScheduler fs;
@Override
public void init(Clock clock, ResourceScheduler sched,
Collection<Plan> plans) {
super.init(clock, sched, plans);
fs = (FairScheduler)sched;
LOG.info("Initializing Plan Follower Policy:"
+ this.getClass().getCanonicalName());
}
@Override
protected Queue getPlanQueue(String planQueueName) {
Queue planQueue = fs.getQueueManager().getParentQueue(planQueueName, false);
if (planQueue == null) {
LOG.error("The queue " + planQueueName + " cannot be found or is not a " +
"ParentQueue");
}
return planQueue;
}
@Override
protected float calculateReservationToPlanRatio(Resource clusterResources,
Resource planResources, Resource capToAssign) {
return Resources.divide(fs.getResourceCalculator(),
clusterResources, capToAssign, planResources);
}
@Override
protected boolean arePlanResourcesLessThanReservations(Resource
clusterResources, Resource planResources, Resource reservedResources) {
return Resources.greaterThan(fs.getResourceCalculator(),
clusterResources, reservedResources, planResources);
}
@Override
protected List<? extends Queue> getChildReservationQueues(Queue queue) {
FSQueue planQueue = (FSQueue)queue;
List<FSQueue> childQueues = planQueue.getChildQueues();
return childQueues;
}
@Override
protected void addReservationQueue(String planQueueName, Queue queue,
String currResId) {
String leafQueueName = getReservationQueueName(planQueueName, currResId);
fs.getQueueManager().getLeafQueue(leafQueueName, true);
}
@Override
protected void createDefaultReservationQueue(String planQueueName,
Queue queue, String defReservationId) {
String defReservationQueueName = getReservationQueueName(planQueueName,
defReservationId);
if (!fs.getQueueManager().exists(defReservationQueueName)) {
fs.getQueueManager().getLeafQueue(defReservationQueueName, true);
}
}
@Override
protected Resource getPlanResources(Plan plan, Queue queue,
Resource clusterResources) {
FSParentQueue planQueue = (FSParentQueue)queue;
Resource planResources = planQueue.getSteadyFairShare();
return planResources;
}
@Override
protected Resource getReservationQueueResourceIfExists(Plan plan,
ReservationId reservationId) {
String reservationQueueName = getReservationQueueName(plan.getQueueName(),
reservationId.toString());
FSLeafQueue reservationQueue =
fs.getQueueManager().getLeafQueue(reservationQueueName, false);
Resource reservationResource = null;
if (reservationQueue != null) {
reservationResource = reservationQueue.getSteadyFairShare();
}
return reservationResource;
}
@Override
protected String getReservationQueueName(String planQueueName,
String reservationQueueName) {
String planQueueNameFullPath = fs.getQueueManager().getQueue
(planQueueName).getName();
if (!reservationQueueName.startsWith(planQueueNameFullPath)) {
// If name is not a path we need full path for FairScheduler. See
// YARN-2773 for the root cause
return planQueueNameFullPath + "." + reservationQueueName;
}
return reservationQueueName;
}
@Override
protected String getReservationIdFromQueueName(String resQueueName) {
return resQueueName.substring(resQueueName.lastIndexOf(".") + 1);
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
public interface ReservationConstants {
/**
* The suffix used for a queue under a reservable queue that will be used
* as a default queue whenever no reservation is used
*/
String DEFAULT_QUEUE_SUFFIX = "-default";
}

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
@ -66,6 +65,7 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -1419,7 +1419,7 @@ public class CapacityScheduler extends
queueName = resQName;
} else {
// use the default child queue of the plan for unreserved apps
queueName = queueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
queueName = queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
}
return queueName;
}
@ -1583,7 +1583,7 @@ public class CapacityScheduler extends
CSQueue dest = getQueue(targetQueueName);
if (dest != null && dest instanceof PlanQueue) {
// use the default child reservation queue of the plan
targetQueueName = targetQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
targetQueueName = targetQueueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
}
return targetQueueName;
}

View File

@ -37,8 +37,6 @@ import org.slf4j.LoggerFactory;
*/
public class PlanQueue extends ParentQueue {
public static final String DEFAULT_QUEUE_SUFFIX = "-default";
private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
private int maxAppsForReservation;

View File

@ -207,6 +207,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
ResourceWeights weight = queueWeights.get(queue);
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
}
public void setQueueWeight(String queue, ResourceWeights weight) {
queueWeights.put(queue, weight);
}
public int getUserMaxApps(String user) {
Integer maxApps = userMaxApps.get(user);
@ -323,4 +327,14 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
public long getEnforcementWindow(String queue) {
return globalReservationQueueConfig.getEnforcementWindowMsec();
}
@VisibleForTesting
public void setReservationWindow(long window) {
globalReservationQueueConfig.setReservationWindow(window);
}
@VisibleForTesting
public void setAverageCapacity(int avgCapacity) {
globalReservationQueueConfig.setAverageCapacity(avgCapacity);
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
@ -516,6 +517,15 @@ public class FSLeafQueue extends FSQueue {
}
}
/** Allows setting weight for a dynamically created queue
* Currently only used for reservation based queues
* @param weight queue weight
*/
public void setWeights(float weight) {
scheduler.getAllocationConfiguration().setQueueWeight(getName(),
new ResourceWeights(weight));
}
/**
* Helper method to check if the queue should preempt containers
*

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -47,6 +48,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -68,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
@ -1163,9 +1166,15 @@ public class FairScheduler extends
throw new RuntimeException("Unexpected event type: " + event);
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
String queueName =
resolveReservationQueueName(appAddedEvent.getQueue(),
appAddedEvent.getApplicationId(),
appAddedEvent.getReservationID());
if (queueName != null) {
addApplication(appAddedEvent.getApplicationId(),
queueName, appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
if (!(event instanceof AppRemovedSchedulerEvent)) {
@ -1223,6 +1232,51 @@ public class FairScheduler extends
}
}
private String resolveReservationQueueName(String queueName,
ApplicationId applicationId, ReservationId reservationID) {
FSQueue queue = queueMgr.getQueue(queueName);
if ((queue == null) || !allocConf.isReservable(queue.getQueueName())) {
return queueName;
}
// Use fully specified name from now on (including root. prefix)
queueName = queue.getQueueName();
if (reservationID != null) {
String resQName = queueName + "." + reservationID.toString();
queue = queueMgr.getQueue(resQName);
if (queue == null) {
String message =
"Application "
+ applicationId
+ " submitted to a reservation which is not yet currently active: "
+ resQName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return null;
}
if (!queue.getParent().getQueueName().equals(queueName)) {
String message =
"Application: " + applicationId + " submitted to a reservation "
+ resQName + " which does not belong to the specified queue: "
+ queueName;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, message));
return null;
}
// use the reservation queue to run the app
queueName = resQName;
} else {
// use the default child queue of the plan for unreserved apps
queueName = getDefaultQueueForPlanQueue(queueName);
}
return queueName;
}
private String getDefaultQueueForPlanQueue(String queueName) {
String planName = queueName.substring(queueName.lastIndexOf(".") + 1);
queueName = queueName + "." + planName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
return queueName;
}
@Override
public void recover(RMState state) throws Exception {
// NOT IMPLEMENTED
@ -1441,7 +1495,8 @@ public class FairScheduler extends
// To serialize with FairScheduler#allocate, synchronize on app attempt
synchronized (attempt) {
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
FSLeafQueue targetQueue = queueMgr.getLeafQueue(queueName, false);
String destQueueName = handleMoveToPlanQueue(queueName);
FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
if (targetQueue == null) {
throw new YarnException("Target queue " + queueName
+ " not found or is not a leaf queue.");
@ -1577,4 +1632,45 @@ public class FairScheduler extends
}
return planQueues;
}
@Override
public void setEntitlement(String queueName,
QueueEntitlement entitlement) throws YarnException {
FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false);
if (reservationQueue == null) {
throw new YarnException("Target queue " + queueName
+ " not found or is not a leaf queue.");
}
reservationQueue.setWeights(entitlement.getCapacity());
// TODO Does MaxCapacity need to be set for fairScheduler ?
}
/**
* Only supports removing empty leaf queues
* @param queueName name of queue to remove
* @throws YarnException if queue to remove is either not a leaf or if its
* not empty
*/
@Override
public void removeQueue(String queueName) throws YarnException {
FSLeafQueue reservationQueue = queueMgr.getLeafQueue(queueName, false);
if (reservationQueue != null) {
if (!queueMgr.removeLeafQueue(queueName)) {
throw new YarnException("Could not remove queue " + queueName + " as " +
"its either not a leaf queue or its not empty");
}
}
}
private String handleMoveToPlanQueue(String targetQueueName) {
FSQueue dest = queueMgr.getQueue(targetQueueName);
if (dest != null && allocConf.isReservable(dest.getQueueName())) {
// use the default child reservation queue of the plan
targetQueueName = getDefaultQueueForPlanQueue(targetQueueName);
}
return targetQueueName;
}
}

View File

@ -91,7 +91,18 @@ public class QueueManager {
}
return (FSLeafQueue) queue;
}
/**
* Remove a leaf queue if empty
* @param name name of the queue
* @return true if queue was removed or false otherwise
*/
public boolean removeLeafQueue(String name) {
name = ensureRootPrefix(name);
return removeEmptyIncompatibleQueues(name, FSQueueType.PARENT);
}
/**
* Get a parent queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a parent queue, i.e. it already exists as a

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
@ -102,4 +103,14 @@ public class ReservationQueueConfiguration {
public void setReservationAgent(String reservationAgent) {
this.reservationAgent = reservationAgent;
}
@VisibleForTesting
public void setReservationWindow(long reservationWindow) {
this.reservationWindow = reservationWindow;
}
@VisibleForTesting
public void setAverageCapacity(int averageCapacity) {
this.avgOverTimeMultiplier = averageCapacity;
}
}

View File

@ -29,7 +29,6 @@ import java.io.PrintWriter;
import java.util.Collections;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
@ -42,12 +41,16 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationDefinitionPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationRequestsPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -160,6 +163,27 @@ public class ReservationSystemTestUtil {
out.close();
}
public static FairScheduler setupFairScheduler(
ReservationSystemTestUtil testUtil,
RMContext rmContext, Configuration conf, int numContainers) throws
IOException {
FairScheduler scheduler = new FairScheduler();
scheduler.setRMContext(rmContext);
when(rmContext.getScheduler()).thenReturn(scheduler);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
Resource resource = testUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
return scheduler;
}
@SuppressWarnings("unchecked")
public CapacityScheduler mockCapacityScheduler(int numContainers)
throws IOException {

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
@ -149,7 +148,7 @@ public class TestCapacitySchedulerPlanFollower extends TestSchedulerPlanFollower
@Override
protected Queue getDefaultQueue() {
return cs.getQueue("dedicated" + PlanQueue.DEFAULT_QUEUE_SUFFIX);
return cs.getQueue("dedicated" + ReservationConstants.DEFAULT_QUEUE_SUFFIX);
}
@Override

View File

@ -18,14 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
@ -38,15 +35,16 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import static org.mockito.Mockito.when;
public class TestFairReservationSystem extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
public class TestFairReservationSystem {
private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
TEST_DIR,
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
private Configuration conf;
private FairScheduler scheduler;
private FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
Configuration conf = testHelper.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
@ -60,10 +58,6 @@ public class TestFairReservationSystem extends FairSchedulerTestBase {
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
@ -75,7 +69,8 @@ public class TestFairReservationSystem extends FairSchedulerTestBase {
// Setup
RMContext mockRMContext = testUtil.createRMContext(conf);
setupFairScheduler(testUtil, mockRMContext);
scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
mockRMContext, conf, 10);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockRMContext);
@ -97,14 +92,15 @@ public class TestFairReservationSystem extends FairSchedulerTestBase {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
RMContext mockContext = testUtil.createRMContext(conf);
setupFairScheduler(testUtil, mockContext);
RMContext mockRMContext = testUtil.createRMContext(conf);
scheduler = ReservationSystemTestUtil.setupFairScheduler(testUtil,
mockRMContext, conf, 10);
FairReservationSystem reservationSystem = new FairReservationSystem();
reservationSystem.setRMContext(mockContext);
reservationSystem.setRMContext(mockRMContext);
try {
reservationSystem.reinitialize(scheduler.getConf(), mockContext);
reservationSystem.reinitialize(scheduler.getConf(), mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
@ -116,10 +112,10 @@ public class TestFairReservationSystem extends FairSchedulerTestBase {
// Dynamically add a plan
ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE);
scheduler.reinitialize(conf, mockContext);
scheduler.reinitialize(conf, mockRMContext);
try {
reservationSystem.reinitialize(conf, mockContext);
reservationSystem.reinitialize(conf, mockRMContext);
} catch (YarnException e) {
Assert.fail(e.getMessage());
}
@ -129,23 +125,4 @@ public class TestFairReservationSystem extends FairSchedulerTestBase {
(reservationSystem, newQueue);
}
private void setupFairScheduler(ReservationSystemTestUtil testUtil,
RMContext rmContext) throws
IOException {
scheduler = new FairScheduler();
scheduler.setRMContext(rmContext);
int numContainers = 10;
when(rmContext.getScheduler()).thenReturn(scheduler);
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, rmContext);
Resource resource = testUtil.calculateClusterResource(numContainers);
RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
}
}

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.reservation;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.File;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Matchers;
import org.mockito.Mockito;
public class TestFairSchedulerPlanFollower extends
TestSchedulerPlanFollowerBase {
private final static String ALLOC_FILE = new File(FairSchedulerTestBase.
TEST_DIR,
TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath();
private RMContext rmContext;
private RMContext spyRMContext;
private FairScheduler fs;
private Configuration conf;
private FairSchedulerTestBase testHelper = new FairSchedulerTestBase();
@Rule
public TestName name = new TestName();
protected Configuration createConfiguration() {
Configuration conf = testHelper.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
return conf;
}
@Before
public void setUp() throws Exception {
conf = createConfiguration();
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
// Setup
rmContext = TestUtils.getMockRMContext();
spyRMContext = spy(rmContext);
fs = ReservationSystemTestUtil.setupFairScheduler(testUtil,
spyRMContext, conf, 125);
scheduler = fs;
ConcurrentMap<ApplicationId, RMApp> spyApps =
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
.thenReturn(null);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE);
setupPlanFollower();
}
private void setupPlanFollower() throws Exception {
ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil();
mClock = mock(Clock.class);
mAgent = mock(ReservationAgent.class);
String reservationQ = testUtil.getFullReservationQueueName();
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
allocConf.setReservationWindow(20L);
allocConf.setAverageCapacity(20);
policy.init(reservationQ, allocConf);
}
@Test
public void testWithMoveOnExpiry() throws PlanningException,
InterruptedException, AccessControlException {
// invoke plan follower test with move
testPlanFollower(true);
}
@Test
public void testWithKillOnExpiry() throws PlanningException,
InterruptedException, AccessControlException {
// invoke plan follower test with kill
testPlanFollower(false);
}
@Override
protected void verifyCapacity(Queue defQ) {
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) >
0.9);
}
@Override
protected Queue getDefaultQueue() {
return getReservationQueue("dedicated" +
ReservationConstants.DEFAULT_QUEUE_SUFFIX);
}
@Override
protected int getNumberOfApplications(Queue queue) {
int numberOfApplications = fs.getAppsInQueue(queue.getQueueName()).size();
return numberOfApplications;
}
@Override
protected AbstractSchedulerPlanFollower createPlanFollower() {
FairSchedulerPlanFollower planFollower =
new FairSchedulerPlanFollower();
planFollower.init(mClock, scheduler, Collections.singletonList(plan));
return planFollower;
}
@Override
protected void assertReservationQueueExists(ReservationId r) {
Queue q = getReservationQueue(r.toString());
assertNotNull(q);
}
@Override
protected void assertReservationQueueExists(ReservationId r,
double expectedCapacity, double expectedMaxCapacity) {
FSLeafQueue q = fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" +
"." +
r, false);
assertNotNull(q);
// For now we are setting both to same weight
Assert.assertEquals(expectedCapacity, q.getWeights().getWeight
(ResourceType.MEMORY), 0.01);
}
@Override
protected void assertReservationQueueDoesNotExist(ReservationId r) {
Queue q = getReservationQueue(r.toString());
assertNull(q);
}
@Override
protected Queue getReservationQueue(String r) {
return fs.getQueueManager().getLeafQueue(plan.getQueueName() + "" +
"." +
r, false);
}
public static ApplicationACLsManager mockAppACLsManager() {
Configuration conf = new Configuration();
return new ApplicationACLsManager(conf);
}
@After
public void tearDown() throws Exception {
if (scheduler != null) {
fs.stop();
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@ -217,7 +218,7 @@ public class TestCapacitySchedulerDynamicBehavior {
assertEquals(1, appsInRoot.size());
// create the default reservation queue
String defQName = "a" + PlanQueue.DEFAULT_QUEUE_SUFFIX;
String defQName = "a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
ReservationQueue defQ =
new ReservationQueue(scheduler, defQName,
(PlanQueue) scheduler.getQueue("a"));

View File

@ -60,7 +60,7 @@ public class FairSchedulerTestBase {
}
}
protected final static String TEST_DIR =
public final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
private static RecordFactory
@ -74,7 +74,7 @@ public class FairSchedulerTestBase {
protected ResourceManager resourceManager;
// Helper methods
protected Configuration createConfiguration() {
public Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);

View File

@ -58,8 +58,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
}
}
@Override
protected Configuration createConfiguration() {
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, StubbedFairScheduler.class,
ResourceScheduler.class);