YARN-3887. Support changing Application priority during runtime. Contributed by Sunil G

This commit is contained in:
Jian He 2015-08-10 20:51:54 -07:00
parent b56daff6a1
commit fa1d84ae27
8 changed files with 342 additions and 1 deletions

View File

@ -164,6 +164,9 @@ Release 2.8.0 - UNRELEASED
YARN-3873. PendingApplications in LeafQueue should also use OrderingPolicy.
(Sunil G via wangda)
YARN-3887. Support changing Application priority during runtime. (Sunil G
via jianhe)
IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -706,6 +706,11 @@ public abstract class RMStateStore extends AbstractService {
dispatcher.getEventHandler().handle(new RMStateUpdateAppEvent(appState));
}
public void updateApplicationStateSynchronously(
ApplicationStateData appState) {
handleStoreEvent(new RMStateUpdateAppEvent(appState));
}
public void updateFencedState() {
handleStoreEvent(new RMStateStoreEvent(RMStateStoreEventType.FENCED));
}

View File

@ -701,4 +701,11 @@ public abstract class AbstractYarnScheduler
// specific scheduler.
return Priority.newInstance(0);
}
@Override
public void updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException {
// Dummy Implementation till Application Priority changes are done in
// specific scheduler.
}
}

View File

@ -98,7 +98,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;
private Priority appPriority = null;
private volatile Priority appPriority = null;
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);

View File

@ -306,4 +306,15 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
public Priority checkAndGetApplicationPriority(Priority priorityFromContext,
String user, String queueName, ApplicationId applicationId)
throws YarnException;
/**
*
* Change application priority of a submitted application at runtime
*
* @param newPriority Submitted Application priority.
*
* @param applicationId Application ID
*/
public void updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException;
}

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -1850,4 +1851,52 @@ public class CapacityScheduler extends
public Priority getMaxClusterLevelAppPriority() {
return maxClusterLevelAppPriority;
}
@Override
public synchronized void updateApplicationPriority(Priority newPriority,
ApplicationId applicationId) throws YarnException {
Priority appPriority = null;
SchedulerApplication<FiCaSchedulerApp> application = applications
.get(applicationId);
if (application == null) {
throw new YarnException("Application '" + applicationId
+ "' is not present, hence could not change priority.");
}
if (application.getPriority().equals(newPriority)) {
return;
}
RMApp rmApp = rmContext.getRMApps().get(applicationId);
appPriority = checkAndGetApplicationPriority(newPriority, rmApp.getUser(),
rmApp.getQueue(), applicationId);
// Update new priority in Submission Context to keep track in HA
rmApp.getApplicationSubmissionContext().setPriority(appPriority);
// Update to state store
ApplicationStateData appState = ApplicationStateData.newInstance(
rmApp.getSubmitTime(), rmApp.getStartTime(),
rmApp.getApplicationSubmissionContext(), rmApp.getUser());
rmContext.getStateStore().updateApplicationStateSynchronously(appState);
// As we use iterator over a TreeSet for OrderingPolicy, once we change
// priority then reinsert back to make order correct.
LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
synchronized (queue) {
queue.getOrderingPolicy().removeSchedulableEntity(
application.getCurrentAppAttempt());
// Update new priority in SchedulerApplication
application.setPriority(appPriority);
queue.getOrderingPolicy().addSchedulableEntity(
application.getCurrentAppAttempt());
}
LOG.info("Priority '" + appPriority + "' is updated in queue :"
+ rmApp.getQueue() + "for application:" + applicationId
+ "for the user: " + rmApp.getUser());
}
}

View File

@ -94,11 +94,17 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
@Override
public void addSchedulableEntity(S s) {
if (null == s) {
return;
}
schedulableEntities.add(s);
}
@Override
public boolean removeSchedulableEntity(S s) {
if (null == s) {
return false;
}
synchronized (entitiesToReorder) {
entitiesToReorder.remove(s.getId());
}

View File

@ -23,7 +23,9 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -36,6 +38,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
@ -307,4 +312,259 @@ public class TestApplicationPriority {
maxPriority);
rm.stop();
}
@Test
public void testUpdatePriorityAtRuntime() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(5);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app1.getApplicationId()).getCurrentAppAttempt();
// Verify whether the new priority is updated
Assert.assertEquals(appPriority2, schedulerAppAttempt.getPriority());
}
@Test
public void testUpdateInvalidPriorityAtRuntime() throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(5);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// Change the priority of App1 to 15
Priority appPriority2 = Priority.newInstance(15);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
// get scheduler app
FiCaSchedulerApp schedulerAppAttempt = cs.getSchedulerApplications()
.get(app1.getApplicationId()).getCurrentAppAttempt();
// Verify whether priority 15 is reset to 10
Priority appPriority3 = Priority.newInstance(10);
Assert.assertEquals(appPriority3, schedulerAppAttempt.getPriority());
rm.stop();
}
@Test(timeout = 180000)
public void testRMRestartWithChangeInPriority() throws Exception {
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
false);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
RMState rmState = memStore.getState();
Map<ApplicationId, ApplicationStateData> rmAppState = rmState
.getApplicationState();
// PHASE 1: create state in an RM
// start RM
MockRM rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 = new MockNM("127.0.0.1:1234", 15120,
rm1.getResourceTrackerService());
nm1.registerNode();
Priority appPriority1 = Priority.newInstance(5);
RMApp app1 = rm1.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
am1.registerAppAttempt();
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
// Change the priority of App1 to 8
Priority appPriority2 = Priority.newInstance(8);
cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
// let things settle down
Thread.sleep(1000);
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore);
// start new RM
rm2.start();
// change NM to point to new RM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// Verify RM Apps after this restart
Assert.assertEquals(1, rm2.getRMContext().getRMApps().size());
// get scheduler app
RMApp loadedApp = rm2.getRMContext().getRMApps()
.get(app1.getApplicationId());
// Verify whether priority 15 is reset to 10
Assert.assertEquals(appPriority2, loadedApp.getCurrentAppAttempt()
.getSubmissionContext().getPriority());
rm2.stop();
rm1.stop();
}
@Test
public void testApplicationPriorityAllocationWithChangeInPriority()
throws Exception {
Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
// Set Max Application Priority as 10
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
MockRM rm = new MockRM(conf);
rm.start();
Priority appPriority1 = Priority.newInstance(5);
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 16 * GB);
RMApp app1 = rm.submitApp(1 * GB, appPriority1);
// kick the scheduler, 1 GB given to AM1, remaining 15GB on nm1
MockAM am1 = MockRM.launchAM(app1, rm, nm1);
am1.registerAppAttempt();
// add request for containers and wait for containers to be allocated.
int NUM_CONTAINERS = 7;
List<Container> allocated1 = am1.allocateAndWaitForContainers("127.0.0.1",
NUM_CONTAINERS, 2 * GB, nm1);
Assert.assertEquals(7, allocated1.size());
Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemory());
// check node report, 15 GB used (1 AM and 7 containers) and 1 GB available
SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
nm1.getNodeId());
Assert.assertEquals(15 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(1 * GB, report_nm1.getAvailableResource().getMemory());
// Submit the second app App2 with priority 8 (Higher than App1)
Priority appPriority2 = Priority.newInstance(8);
RMApp app2 = rm.submitApp(1 * GB, appPriority2);
// kick the scheduler, 1 GB which was free is given to AM of App2
nm1.nodeHeartbeat(true);
MockAM am2 = MockRM.launchAM(app2, rm, nm1);
am2.registerAppAttempt();
// check node report, 16 GB used and 0 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// get scheduler app
FiCaSchedulerApp schedulerAppAttemptApp1 = cs.getSchedulerApplications()
.get(app1.getApplicationId()).getCurrentAppAttempt();
// kill 2 containers to free up some space
int counter = 0;
for (Iterator<Container> iterator = allocated1.iterator(); iterator
.hasNext();) {
Container c = iterator.next();
if (++counter > 2) {
break;
}
cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
// check node report, 12 GB used and 4 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(12 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(4 * GB, report_nm1.getAvailableResource().getMemory());
// add request for containers App1
am1.allocate("127.0.0.1", 2 * GB, 10, new ArrayList<ContainerId>());
// add request for containers App2 and wait for containers to get allocated
List<Container> allocated2 = am2.allocateAndWaitForContainers("127.0.0.1",
2, 2 * GB, nm1);
Assert.assertEquals(2, allocated2.size());
// check node report, 16 GB used and 0 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(16 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemory());
// kill 1 more
counter = 0;
for (Iterator<Container> iterator = allocated1.iterator(); iterator
.hasNext();) {
Container c = iterator.next();
if (++counter > 1) {
break;
}
cs.killContainer(schedulerAppAttemptApp1.getRMContainer(c.getId()));
iterator.remove();
}
// check node report, 14 GB used and 2 GB available
report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(14 * GB, report_nm1.getUsedResource().getMemory());
Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemory());
// Change the priority of App1 to 3 (lowest)
Priority appPriority3 = Priority.newInstance(3);
cs.updateApplicationPriority(appPriority3, app2.getApplicationId());
// add request for containers App2
am2.allocate("127.0.0.1", 2 * GB, 3, new ArrayList<ContainerId>());
// add request for containers App1 and wait for containers to get allocated
// since priority is more for App1 now, App1 will get a container.
List<Container> allocated3 = am1.allocateAndWaitForContainers("127.0.0.1",
1, 2 * GB, nm1);
Assert.assertEquals(1, allocated3.size());
// Now App1 will have 5 containers and 1 AM. App2 will have 2 containers.
Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
rm.stop();
}
}