YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S

This commit is contained in:
Jian He 2016-01-08 15:51:10 -08:00
parent fd8065a763
commit 109e528ef5
8 changed files with 241 additions and 13 deletions

View File

@ -1229,6 +1229,9 @@ Release 2.8.0 - UNRELEASED
YARN-4546. ResourceManager crash due to scheduling opportunity overflow. YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
(Jason Lowe via junping_du) (Jason Lowe via junping_du)
YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps.
(Rohith Sharma K S via jianhe)
Release 2.7.3 - UNRELEASED Release 2.7.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -205,6 +205,8 @@
<Field name="userLimitFactor" /> <Field name="userLimitFactor" />
<Field name="maxAMResourcePerQueuePercent" /> <Field name="maxAMResourcePerQueuePercent" />
<Field name="lastClusterResource" /> <Field name="lastClusterResource" />
<Field name="pendingOrderingPolicy" />
<Field name="pendingOPForRecoveredApps" />
</Or> </Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>

View File

@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private LogAggregationContext logAggregationContext; private LogAggregationContext logAggregationContext;
private volatile Priority appPriority = null; private volatile Priority appPriority = null;
private boolean isAttemptRecovering;
protected ResourceUsage attemptResourceUsage = new ResourceUsage(); protected ResourceUsage attemptResourceUsage = new ResourceUsage();
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0); private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
@ -967,6 +968,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
// queue's resource usage for specific partition // queue's resource usage for specific partition
} }
public boolean isAttemptRecovering() {
return isAttemptRecovering;
}
protected void setAttemptRecovering(boolean isRecovering) {
this.isAttemptRecovering = isRecovering;
}
public static enum AMState { public static enum AMState {
UNMANAGED("User launched the Application Master, since it's unmanaged. "), UNMANAGED("User launched the Application Master, since it's unmanaged. "),
INACTIVATED("Application is added to the scheduler and is not yet activated. "), INACTIVATED("Application is added to the scheduler and is not yet activated. "),

View File

@ -783,7 +783,7 @@ public class CapacityScheduler extends
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId, FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
application.getUser(), queue, queue.getActiveUsersManager(), rmContext, application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
application.getPriority()); application.getPriority(), isAttemptRecovering);
if (transferStateFromPreviousAttempt) { if (transferStateFromPreviousAttempt) {
attempt.transferStateFromPreviousAttempt( attempt.transferStateFromPreviousAttempt(
application.getCurrentAppAttempt()); application.getCurrentAppAttempt());

View File

@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue {
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null; private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
// Always give preference to this while activating the application attempts.
private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
private volatile float minimumAllocationFactor; private volatile float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>(); private Map<String, User> users = new HashMap<String, User>();
@ -156,6 +159,8 @@ public class LeafQueue extends AbstractCSQueue {
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath())); setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicy(conf setPendingAppsOrderingPolicy(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath())); .<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
setPendingAppsOrderingPolicyRecovery(conf
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
userLimit = conf.getUserLimit(getQueuePath()); userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath()); userLimitFactor = conf.getUserLimitFactor(getQueuePath());
@ -320,7 +325,8 @@ public class LeafQueue extends AbstractCSQueue {
} }
public synchronized int getNumPendingApplications() { public synchronized int getNumPendingApplications() {
return pendingOrderingPolicy.getNumSchedulableEntities(); return pendingOrderingPolicy.getNumSchedulableEntities()
+ pendingOPForRecoveredApps.getNumSchedulableEntities();
} }
public synchronized int getNumActiveApplications() { public synchronized int getNumActiveApplications() {
@ -599,9 +605,19 @@ public class LeafQueue extends AbstractCSQueue {
Map<String, Resource> userAmPartitionLimit = Map<String, Resource> userAmPartitionLimit =
new HashMap<String, Resource>(); new HashMap<String, Resource>();
for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy() activateApplications(getPendingAppsOrderingPolicyRecovery()
.getAssignmentIterator(); i.hasNext();) { .getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit);
FiCaSchedulerApp application = i.next();
activateApplications(
getPendingAppsOrderingPolicy().getAssignmentIterator(),
amPartitionLimit, userAmPartitionLimit);
}
private synchronized void activateApplications(
Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit,
Map<String, Resource> userAmPartitionLimit) {
while (fsApp.hasNext()) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId(); ApplicationId applicationId = application.getApplicationId();
// Get the am-node-partition associated with each application // Get the am-node-partition associated with each application
@ -692,7 +708,7 @@ public class LeafQueue extends AbstractCSQueue {
metrics.incAMUsed(application.getUser(), metrics.incAMUsed(application.getUser(),
application.getAMResource(partitionName)); application.getAMResource(partitionName));
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
i.remove(); fsApp.remove();
LOG.info("Application " + applicationId + " from user: " LOG.info("Application " + applicationId + " from user: "
+ application.getUser() + " activated in queue: " + getQueueName()); + application.getUser() + " activated in queue: " + getQueueName());
} }
@ -702,7 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
User user) { User user) {
// Accept // Accept
user.submitApplication(); user.submitApplication();
getPendingAppsOrderingPolicy().addSchedulableEntity(application); if (application.isAttemptRecovering()) {
getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
} else {
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
}
applicationAttemptMap.put(application.getApplicationAttemptId(), application); applicationAttemptMap.put(application.getApplicationAttemptId(), application);
// Activate applications // Activate applications
@ -742,7 +762,11 @@ public class LeafQueue extends AbstractCSQueue {
boolean wasActive = boolean wasActive =
orderingPolicy.removeSchedulableEntity(application); orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) { if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application); if (application.isAttemptRecovering()) {
pendingOPForRecoveredApps.removeSchedulableEntity(application);
} else {
pendingOrderingPolicy.removeSchedulableEntity(application);
}
} else { } else {
queueUsage.decAMUsed(partitionName, queueUsage.decAMUsed(partitionName,
application.getAMResource(partitionName)); application.getAMResource(partitionName));
@ -1491,7 +1515,11 @@ public class LeafQueue extends AbstractCSQueue {
* Obtain (read-only) collection of pending applications. * Obtain (read-only) collection of pending applications.
*/ */
public Collection<FiCaSchedulerApp> getPendingApplications() { public Collection<FiCaSchedulerApp> getPendingApplications() {
return pendingOrderingPolicy.getSchedulableEntities(); Collection<FiCaSchedulerApp> pendingApps =
new ArrayList<FiCaSchedulerApp>();
pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
return pendingApps;
} }
/** /**
@ -1535,6 +1563,10 @@ public class LeafQueue extends AbstractCSQueue {
@Override @Override
public synchronized void collectSchedulerApplications( public synchronized void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) { Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
.getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId());
}
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
.getSchedulableEntities()) { .getSchedulableEntities()) {
apps.add(pendingApp.getApplicationAttemptId()); apps.add(pendingApp.getApplicationAttemptId());
@ -1670,6 +1702,21 @@ public class LeafQueue extends AbstractCSQueue {
this.pendingOrderingPolicy = pendingOrderingPolicy; this.pendingOrderingPolicy = pendingOrderingPolicy;
} }
public synchronized OrderingPolicy<FiCaSchedulerApp>
getPendingAppsOrderingPolicyRecovery() {
return pendingOPForRecoveredApps;
}
public synchronized void setPendingAppsOrderingPolicyRecovery(
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
if (null != this.pendingOPForRecoveredApps) {
pendingOrderingPolicyRecovery
.addAllSchedulableEntities(this.pendingOPForRecoveredApps
.getSchedulableEntities());
}
this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
}
/* /*
* Holds shared values used by all applications in * Holds shared values used by all applications in
* the queue to calculate headroom on demand * the queue to calculate headroom on demand

View File

@ -99,12 +99,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) { RMContext rmContext) {
this(applicationAttemptId, user, queue, activeUsersManager, rmContext, this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
Priority.newInstance(0)); Priority.newInstance(0), false);
} }
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager, String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext, Priority appPriority) { RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext); super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
@ -129,6 +129,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
setAppAMNodePartitionName(partition); setAppAMNodePartitionName(partition);
setAMResource(partition, amResource); setAMResource(partition, amResource);
setPriority(appPriority); setPriority(appPriority);
setAttemptRecovering(isAttemptRecovering);
scheduler = rmContext.getScheduler(); scheduler = rmContext.getScheduler();

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@ -567,4 +569,166 @@ public class TestApplicationPriority {
Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size()); Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
rm.stop(); rm.stop();
} }
/**
* <p>
* Test case verifies the order of applications activated after RM Restart.
* </p>
* <li>App-1 and app-2 submitted and scheduled and running with a priority
* 5 and 6 Respectively</li>
* <li>App-3 submitted and scheduled with a priority 7. This
* is not activated since AMResourceLimit is reached</li>
* <li>RM restarted</li>
* <li>App-1 get activated nevertheless of AMResourceLimit</li>
* <li>App-2 and app-3 put in pendingOrderingPolicy</li>
* <li>After NM registration, app-3 is activated</li>
* <p>
* Expected Output : App-2 must get activated since app-2 was running earlier
* </p>
* @throws Exception
*/
@Test
public void testOrderOfActivatingThePriorityApplicationOnRMRestart()
throws Exception {
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
final DrainDispatcher dispatcher = new DrainDispatcher();
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
MockRM rm1 = new MockRM(conf, memStore) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher;
}
};
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService());
nm1.registerNode();
dispatcher.await();
ResourceScheduler scheduler = rm1.getRMContext().getScheduler();
LeafQueue defaultQueue =
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
int memory = defaultQueue.getAMResourceLimit().getMemory() / 2;
// App-1 with priority 5 submitted and running
Priority appPriority1 = Priority.newInstance(5);
RMApp app1 = rm1.submitApp(memory, appPriority1);
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
am1.registerAppAttempt();
// App-2 with priority 6 submitted and running
Priority appPriority2 = Priority.newInstance(6);
RMApp app2 = rm1.submitApp(memory, appPriority2);
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
am2.registerAppAttempt();
dispatcher.await();
Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
Assert.assertEquals(0, defaultQueue.getNumPendingApplications());
// App-3 with priority 7 submitted and scheduled. But not activated since
// AMResourceLimit threshold
Priority appPriority3 = Priority.newInstance(7);
RMApp app3 = rm1.submitApp(memory, appPriority3);
dispatcher.await();
Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
Assert.assertEquals(1, defaultQueue.getNumPendingApplications());
Iterator<FiCaSchedulerApp> iterator =
defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
FiCaSchedulerApp fcApp2 = iterator.next();
Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
fcApp2.getApplicationAttemptId());
FiCaSchedulerApp fcApp1 = iterator.next();
Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
fcApp1.getApplicationAttemptId());
iterator = defaultQueue.getPendingApplications().iterator();
FiCaSchedulerApp fcApp3 = iterator.next();
Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
fcApp3.getApplicationAttemptId());
final DrainDispatcher dispatcher1 = new DrainDispatcher();
// create new RM to represent restart and recover state
MockRM rm2 = new MockRM(conf, memStore) {
@Override
protected Dispatcher createDispatcher() {
return dispatcher1;
}
};
// start new RM
rm2.start();
// change NM to point to new RM
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
// Verify RM Apps after this restart
Assert.assertEquals(3, rm2.getRMContext().getRMApps().size());
dispatcher1.await();
scheduler = rm2.getRMContext().getScheduler();
defaultQueue =
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
// wait for all applications to get added to scheduler
int count = 5;
while (count-- > 0) {
if ((defaultQueue.getNumActiveApplications() + defaultQueue
.getNumPendingApplications()) == 3) {
break;
}
Thread.sleep(500);
}
// Before NM registration, AMResourceLimit threshold is 0. So 1st
// applications get activated nevertheless of AMResourceLimit threshold
// Two applications are in pending
Assert.assertEquals(1, defaultQueue.getNumActiveApplications());
Assert.assertEquals(2, defaultQueue.getNumPendingApplications());
// NM resync to new RM
nm1.registerNode();
dispatcher1.await();
// wait for activating one applications
count = 5;
while (count-- > 0) {
if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) {
break;
}
Thread.sleep(500);
}
// verify for order of activated applications iterator
iterator =
defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
fcApp2 = iterator.next();
Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
fcApp2.getApplicationAttemptId());
fcApp1 = iterator.next();
Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
fcApp1.getApplicationAttemptId());
// verify for pending application iterator. It should be app-3 attempt
iterator = defaultQueue.getPendingApplications().iterator();
fcApp3 = iterator.next();
Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
fcApp3.getApplicationAttemptId());
rm2.stop();
rm1.stop();
}
} }

View File

@ -2413,14 +2413,16 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0); TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = FiCaSchedulerApp app_0 =
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3))); mock(ActiveUsersManager.class), spyRMContext,
Priority.newInstance(3), false));
a.submitApplicationAttempt(app_0, user_0); a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0); TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = FiCaSchedulerApp app_1 =
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5))); mock(ActiveUsersManager.class), spyRMContext,
Priority.newInstance(5), false));
a.submitApplicationAttempt(app_1, user_0); a.submitApplicationAttempt(app_1, user_0);
Priority priority = TestUtils.createMockPriority(1); Priority priority = TestUtils.createMockPriority(1);