YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps. Contributed by Rohith Sharma K S
(cherry picked from commit 109e528ef5
)
This commit is contained in:
parent
699d420497
commit
d4ff8fd5d8
|
@ -1174,6 +1174,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
|
YARN-4546. ResourceManager crash due to scheduling opportunity overflow.
|
||||||
(Jason Lowe via junping_du)
|
(Jason Lowe via junping_du)
|
||||||
|
|
||||||
|
YARN-4479. Change CS LeafQueue pendingOrderingPolicy to hornor recovered apps.
|
||||||
|
(Rohith Sharma K S via jianhe)
|
||||||
|
|
||||||
Release 2.7.3 - UNRELEASED
|
Release 2.7.3 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -205,6 +205,8 @@
|
||||||
<Field name="userLimitFactor" />
|
<Field name="userLimitFactor" />
|
||||||
<Field name="maxAMResourcePerQueuePercent" />
|
<Field name="maxAMResourcePerQueuePercent" />
|
||||||
<Field name="lastClusterResource" />
|
<Field name="lastClusterResource" />
|
||||||
|
<Field name="pendingOrderingPolicy" />
|
||||||
|
<Field name="pendingOPForRecoveredApps" />
|
||||||
</Or>
|
</Or>
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
</Match>
|
</Match>
|
||||||
|
|
|
@ -109,6 +109,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
private LogAggregationContext logAggregationContext;
|
private LogAggregationContext logAggregationContext;
|
||||||
|
|
||||||
private volatile Priority appPriority = null;
|
private volatile Priority appPriority = null;
|
||||||
|
private boolean isAttemptRecovering;
|
||||||
|
|
||||||
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
|
protected ResourceUsage attemptResourceUsage = new ResourceUsage();
|
||||||
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
|
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
|
||||||
|
@ -967,6 +968,14 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
|
||||||
// queue's resource usage for specific partition
|
// queue's resource usage for specific partition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isAttemptRecovering() {
|
||||||
|
return isAttemptRecovering;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setAttemptRecovering(boolean isRecovering) {
|
||||||
|
this.isAttemptRecovering = isRecovering;
|
||||||
|
}
|
||||||
|
|
||||||
public static enum AMState {
|
public static enum AMState {
|
||||||
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
|
UNMANAGED("User launched the Application Master, since it's unmanaged. "),
|
||||||
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
|
INACTIVATED("Application is added to the scheduler and is not yet activated. "),
|
||||||
|
|
|
@ -783,7 +783,7 @@ public class CapacityScheduler extends
|
||||||
|
|
||||||
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
|
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
|
||||||
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
|
application.getUser(), queue, queue.getActiveUsersManager(), rmContext,
|
||||||
application.getPriority());
|
application.getPriority(), isAttemptRecovering);
|
||||||
if (transferStateFromPreviousAttempt) {
|
if (transferStateFromPreviousAttempt) {
|
||||||
attempt.transferStateFromPreviousAttempt(
|
attempt.transferStateFromPreviousAttempt(
|
||||||
application.getCurrentAppAttempt());
|
application.getCurrentAppAttempt());
|
||||||
|
|
|
@ -95,6 +95,9 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
|
private OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy = null;
|
||||||
|
|
||||||
|
// Always give preference to this while activating the application attempts.
|
||||||
|
private OrderingPolicy<FiCaSchedulerApp> pendingOPForRecoveredApps = null;
|
||||||
|
|
||||||
private volatile float minimumAllocationFactor;
|
private volatile float minimumAllocationFactor;
|
||||||
|
|
||||||
private Map<String, User> users = new HashMap<String, User>();
|
private Map<String, User> users = new HashMap<String, User>();
|
||||||
|
@ -156,6 +159,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
|
setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
|
||||||
setPendingAppsOrderingPolicy(conf
|
setPendingAppsOrderingPolicy(conf
|
||||||
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
|
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
|
||||||
|
setPendingAppsOrderingPolicyRecovery(conf
|
||||||
|
.<FiCaSchedulerApp> getOrderingPolicy(getQueuePath()));
|
||||||
|
|
||||||
userLimit = conf.getUserLimit(getQueuePath());
|
userLimit = conf.getUserLimit(getQueuePath());
|
||||||
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
|
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
|
||||||
|
@ -320,7 +325,8 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int getNumPendingApplications() {
|
public synchronized int getNumPendingApplications() {
|
||||||
return pendingOrderingPolicy.getNumSchedulableEntities();
|
return pendingOrderingPolicy.getNumSchedulableEntities()
|
||||||
|
+ pendingOPForRecoveredApps.getNumSchedulableEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized int getNumActiveApplications() {
|
public synchronized int getNumActiveApplications() {
|
||||||
|
@ -599,9 +605,19 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
Map<String, Resource> userAmPartitionLimit =
|
Map<String, Resource> userAmPartitionLimit =
|
||||||
new HashMap<String, Resource>();
|
new HashMap<String, Resource>();
|
||||||
|
|
||||||
for (Iterator<FiCaSchedulerApp> i = getPendingAppsOrderingPolicy()
|
activateApplications(getPendingAppsOrderingPolicyRecovery()
|
||||||
.getAssignmentIterator(); i.hasNext();) {
|
.getAssignmentIterator(), amPartitionLimit, userAmPartitionLimit);
|
||||||
FiCaSchedulerApp application = i.next();
|
|
||||||
|
activateApplications(
|
||||||
|
getPendingAppsOrderingPolicy().getAssignmentIterator(),
|
||||||
|
amPartitionLimit, userAmPartitionLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void activateApplications(
|
||||||
|
Iterator<FiCaSchedulerApp> fsApp, Map<String, Resource> amPartitionLimit,
|
||||||
|
Map<String, Resource> userAmPartitionLimit) {
|
||||||
|
while (fsApp.hasNext()) {
|
||||||
|
FiCaSchedulerApp application = fsApp.next();
|
||||||
ApplicationId applicationId = application.getApplicationId();
|
ApplicationId applicationId = application.getApplicationId();
|
||||||
|
|
||||||
// Get the am-node-partition associated with each application
|
// Get the am-node-partition associated with each application
|
||||||
|
@ -692,7 +708,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
metrics.incAMUsed(application.getUser(),
|
metrics.incAMUsed(application.getUser(),
|
||||||
application.getAMResource(partitionName));
|
application.getAMResource(partitionName));
|
||||||
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
|
metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit);
|
||||||
i.remove();
|
fsApp.remove();
|
||||||
LOG.info("Application " + applicationId + " from user: "
|
LOG.info("Application " + applicationId + " from user: "
|
||||||
+ application.getUser() + " activated in queue: " + getQueueName());
|
+ application.getUser() + " activated in queue: " + getQueueName());
|
||||||
}
|
}
|
||||||
|
@ -702,7 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
User user) {
|
User user) {
|
||||||
// Accept
|
// Accept
|
||||||
user.submitApplication();
|
user.submitApplication();
|
||||||
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
|
if (application.isAttemptRecovering()) {
|
||||||
|
getPendingAppsOrderingPolicyRecovery().addSchedulableEntity(application);
|
||||||
|
} else {
|
||||||
|
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
|
||||||
|
}
|
||||||
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
|
applicationAttemptMap.put(application.getApplicationAttemptId(), application);
|
||||||
|
|
||||||
// Activate applications
|
// Activate applications
|
||||||
|
@ -742,7 +762,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
boolean wasActive =
|
boolean wasActive =
|
||||||
orderingPolicy.removeSchedulableEntity(application);
|
orderingPolicy.removeSchedulableEntity(application);
|
||||||
if (!wasActive) {
|
if (!wasActive) {
|
||||||
pendingOrderingPolicy.removeSchedulableEntity(application);
|
if (application.isAttemptRecovering()) {
|
||||||
|
pendingOPForRecoveredApps.removeSchedulableEntity(application);
|
||||||
|
} else {
|
||||||
|
pendingOrderingPolicy.removeSchedulableEntity(application);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
queueUsage.decAMUsed(partitionName,
|
queueUsage.decAMUsed(partitionName,
|
||||||
application.getAMResource(partitionName));
|
application.getAMResource(partitionName));
|
||||||
|
@ -1491,7 +1515,11 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
* Obtain (read-only) collection of pending applications.
|
* Obtain (read-only) collection of pending applications.
|
||||||
*/
|
*/
|
||||||
public Collection<FiCaSchedulerApp> getPendingApplications() {
|
public Collection<FiCaSchedulerApp> getPendingApplications() {
|
||||||
return pendingOrderingPolicy.getSchedulableEntities();
|
Collection<FiCaSchedulerApp> pendingApps =
|
||||||
|
new ArrayList<FiCaSchedulerApp>();
|
||||||
|
pendingApps.addAll(pendingOPForRecoveredApps.getSchedulableEntities());
|
||||||
|
pendingApps.addAll(pendingOrderingPolicy.getSchedulableEntities());
|
||||||
|
return pendingApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1535,6 +1563,10 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
@Override
|
@Override
|
||||||
public synchronized void collectSchedulerApplications(
|
public synchronized void collectSchedulerApplications(
|
||||||
Collection<ApplicationAttemptId> apps) {
|
Collection<ApplicationAttemptId> apps) {
|
||||||
|
for (FiCaSchedulerApp pendingApp : pendingOPForRecoveredApps
|
||||||
|
.getSchedulableEntities()) {
|
||||||
|
apps.add(pendingApp.getApplicationAttemptId());
|
||||||
|
}
|
||||||
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
|
for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
|
||||||
.getSchedulableEntities()) {
|
.getSchedulableEntities()) {
|
||||||
apps.add(pendingApp.getApplicationAttemptId());
|
apps.add(pendingApp.getApplicationAttemptId());
|
||||||
|
@ -1670,6 +1702,21 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
this.pendingOrderingPolicy = pendingOrderingPolicy;
|
this.pendingOrderingPolicy = pendingOrderingPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized OrderingPolicy<FiCaSchedulerApp>
|
||||||
|
getPendingAppsOrderingPolicyRecovery() {
|
||||||
|
return pendingOPForRecoveredApps;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void setPendingAppsOrderingPolicyRecovery(
|
||||||
|
OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicyRecovery) {
|
||||||
|
if (null != this.pendingOPForRecoveredApps) {
|
||||||
|
pendingOrderingPolicyRecovery
|
||||||
|
.addAllSchedulableEntities(this.pendingOPForRecoveredApps
|
||||||
|
.getSchedulableEntities());
|
||||||
|
}
|
||||||
|
this.pendingOPForRecoveredApps = pendingOrderingPolicyRecovery;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Holds shared values used by all applications in
|
* Holds shared values used by all applications in
|
||||||
* the queue to calculate headroom on demand
|
* the queue to calculate headroom on demand
|
||||||
|
|
|
@ -99,12 +99,12 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||||
RMContext rmContext) {
|
RMContext rmContext) {
|
||||||
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
|
this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
|
||||||
Priority.newInstance(0));
|
Priority.newInstance(0), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
|
||||||
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
String user, Queue queue, ActiveUsersManager activeUsersManager,
|
||||||
RMContext rmContext, Priority appPriority) {
|
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
|
||||||
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
|
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
|
||||||
|
|
||||||
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
|
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
|
||||||
|
@ -129,6 +129,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
|
||||||
setAppAMNodePartitionName(partition);
|
setAppAMNodePartitionName(partition);
|
||||||
setAMResource(partition, amResource);
|
setAMResource(partition, amResource);
|
||||||
setPriority(appPriority);
|
setPriority(appPriority);
|
||||||
|
setAttemptRecovering(isAttemptRecovering);
|
||||||
|
|
||||||
scheduler = rmContext.getScheduler();
|
scheduler = rmContext.getScheduler();
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
|
@ -567,4 +569,166 @@ public class TestApplicationPriority {
|
||||||
Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
|
Assert.assertEquals(6, schedulerAppAttemptApp1.getLiveContainers().size());
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Test case verifies the order of applications activated after RM Restart.
|
||||||
|
* </p>
|
||||||
|
* <li>App-1 and app-2 submitted and scheduled and running with a priority
|
||||||
|
* 5 and 6 Respectively</li>
|
||||||
|
* <li>App-3 submitted and scheduled with a priority 7. This
|
||||||
|
* is not activated since AMResourceLimit is reached</li>
|
||||||
|
* <li>RM restarted</li>
|
||||||
|
* <li>App-1 get activated nevertheless of AMResourceLimit</li>
|
||||||
|
* <li>App-2 and app-3 put in pendingOrderingPolicy</li>
|
||||||
|
* <li>After NM registration, app-3 is activated</li>
|
||||||
|
* <p>
|
||||||
|
* Expected Output : App-2 must get activated since app-2 was running earlier
|
||||||
|
* </p>
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOrderOfActivatingThePriorityApplicationOnRMRestart()
|
||||||
|
throws Exception {
|
||||||
|
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
||||||
|
conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
|
||||||
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return dispatcher;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 16384, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
|
||||||
|
ResourceScheduler scheduler = rm1.getRMContext().getScheduler();
|
||||||
|
LeafQueue defaultQueue =
|
||||||
|
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
|
||||||
|
int memory = defaultQueue.getAMResourceLimit().getMemory() / 2;
|
||||||
|
|
||||||
|
// App-1 with priority 5 submitted and running
|
||||||
|
Priority appPriority1 = Priority.newInstance(5);
|
||||||
|
RMApp app1 = rm1.submitApp(memory, appPriority1);
|
||||||
|
MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
|
||||||
|
am1.registerAppAttempt();
|
||||||
|
|
||||||
|
// App-2 with priority 6 submitted and running
|
||||||
|
Priority appPriority2 = Priority.newInstance(6);
|
||||||
|
RMApp app2 = rm1.submitApp(memory, appPriority2);
|
||||||
|
MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
|
||||||
|
am2.registerAppAttempt();
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
|
||||||
|
Assert.assertEquals(0, defaultQueue.getNumPendingApplications());
|
||||||
|
|
||||||
|
// App-3 with priority 7 submitted and scheduled. But not activated since
|
||||||
|
// AMResourceLimit threshold
|
||||||
|
Priority appPriority3 = Priority.newInstance(7);
|
||||||
|
RMApp app3 = rm1.submitApp(memory, appPriority3);
|
||||||
|
|
||||||
|
dispatcher.await();
|
||||||
|
Assert.assertEquals(2, defaultQueue.getNumActiveApplications());
|
||||||
|
Assert.assertEquals(1, defaultQueue.getNumPendingApplications());
|
||||||
|
|
||||||
|
Iterator<FiCaSchedulerApp> iterator =
|
||||||
|
defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
|
||||||
|
FiCaSchedulerApp fcApp2 = iterator.next();
|
||||||
|
Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
FiCaSchedulerApp fcApp1 = iterator.next();
|
||||||
|
Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
iterator = defaultQueue.getPendingApplications().iterator();
|
||||||
|
FiCaSchedulerApp fcApp3 = iterator.next();
|
||||||
|
Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp3.getApplicationAttemptId());
|
||||||
|
|
||||||
|
final DrainDispatcher dispatcher1 = new DrainDispatcher();
|
||||||
|
// create new RM to represent restart and recover state
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected Dispatcher createDispatcher() {
|
||||||
|
return dispatcher1;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// start new RM
|
||||||
|
rm2.start();
|
||||||
|
// change NM to point to new RM
|
||||||
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
||||||
|
|
||||||
|
// Verify RM Apps after this restart
|
||||||
|
Assert.assertEquals(3, rm2.getRMContext().getRMApps().size());
|
||||||
|
|
||||||
|
dispatcher1.await();
|
||||||
|
scheduler = rm2.getRMContext().getScheduler();
|
||||||
|
defaultQueue =
|
||||||
|
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
|
||||||
|
|
||||||
|
// wait for all applications to get added to scheduler
|
||||||
|
int count = 5;
|
||||||
|
while (count-- > 0) {
|
||||||
|
if ((defaultQueue.getNumActiveApplications() + defaultQueue
|
||||||
|
.getNumPendingApplications()) == 3) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Before NM registration, AMResourceLimit threshold is 0. So 1st
|
||||||
|
// applications get activated nevertheless of AMResourceLimit threshold
|
||||||
|
// Two applications are in pending
|
||||||
|
Assert.assertEquals(1, defaultQueue.getNumActiveApplications());
|
||||||
|
Assert.assertEquals(2, defaultQueue.getNumPendingApplications());
|
||||||
|
|
||||||
|
// NM resync to new RM
|
||||||
|
nm1.registerNode();
|
||||||
|
dispatcher1.await();
|
||||||
|
|
||||||
|
// wait for activating one applications
|
||||||
|
count = 5;
|
||||||
|
while (count-- > 0) {
|
||||||
|
if (defaultQueue.getOrderingPolicy().getSchedulableEntities().size() == 2) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify for order of activated applications iterator
|
||||||
|
iterator =
|
||||||
|
defaultQueue.getOrderingPolicy().getSchedulableEntities().iterator();
|
||||||
|
fcApp2 = iterator.next();
|
||||||
|
Assert.assertEquals(app2.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp2.getApplicationAttemptId());
|
||||||
|
|
||||||
|
fcApp1 = iterator.next();
|
||||||
|
Assert.assertEquals(app1.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp1.getApplicationAttemptId());
|
||||||
|
|
||||||
|
// verify for pending application iterator. It should be app-3 attempt
|
||||||
|
iterator = defaultQueue.getPendingApplications().iterator();
|
||||||
|
fcApp3 = iterator.next();
|
||||||
|
Assert.assertEquals(app3.getCurrentAppAttempt().getAppAttemptId(),
|
||||||
|
fcApp3.getApplicationAttemptId());
|
||||||
|
|
||||||
|
rm2.stop();
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2413,14 +2413,16 @@ public class TestLeafQueue {
|
||||||
TestUtils.getMockApplicationAttemptId(0, 0);
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
FiCaSchedulerApp app_0 =
|
FiCaSchedulerApp app_0 =
|
||||||
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(3)));
|
mock(ActiveUsersManager.class), spyRMContext,
|
||||||
|
Priority.newInstance(3), false));
|
||||||
a.submitApplicationAttempt(app_0, user_0);
|
a.submitApplicationAttempt(app_0, user_0);
|
||||||
|
|
||||||
final ApplicationAttemptId appAttemptId_1 =
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
TestUtils.getMockApplicationAttemptId(1, 0);
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
FiCaSchedulerApp app_1 =
|
FiCaSchedulerApp app_1 =
|
||||||
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a,
|
||||||
mock(ActiveUsersManager.class), spyRMContext, Priority.newInstance(5)));
|
mock(ActiveUsersManager.class), spyRMContext,
|
||||||
|
Priority.newInstance(5), false));
|
||||||
a.submitApplicationAttempt(app_1, user_0);
|
a.submitApplicationAttempt(app_1, user_0);
|
||||||
|
|
||||||
Priority priority = TestUtils.createMockPriority(1);
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
|
Loading…
Reference in New Issue