YARN-9339. Apps pending metric incorrect after moving app to a new queue. Contributed by Abhishek Modi.
This commit is contained in:
parent
59ded7641f
commit
c504eee0c2
|
@ -176,6 +176,15 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
|
||||||
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
String userName);
|
String userName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit an application attempt to the queue.
|
||||||
|
* @param application application whose attempt is being submitted
|
||||||
|
* @param userName user who submitted the application attempt
|
||||||
|
* @param isMoveApp is application being moved across the queue
|
||||||
|
*/
|
||||||
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
|
String userName, boolean isMoveApp);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An application submitted to this queue has finished.
|
* An application submitted to this queue has finished.
|
||||||
* @param applicationId
|
* @param applicationId
|
||||||
|
|
|
@ -2431,7 +2431,7 @@ public class CapacityScheduler extends
|
||||||
if (!app.isStopped()) {
|
if (!app.isStopped()) {
|
||||||
source.finishApplicationAttempt(app, sourceQueueName);
|
source.finishApplicationAttempt(app, sourceQueueName);
|
||||||
// Submit to a new queue
|
// Submit to a new queue
|
||||||
dest.submitApplicationAttempt(app, user);
|
dest.submitApplicationAttempt(app, user, true);
|
||||||
}
|
}
|
||||||
// Finish app & update metrics
|
// Finish app & update metrics
|
||||||
app.move(dest);
|
app.move(dest);
|
||||||
|
|
|
@ -578,6 +578,12 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
@Override
|
@Override
|
||||||
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
String userName) {
|
String userName) {
|
||||||
|
submitApplicationAttempt(application, userName, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
|
String userName, boolean isMoveApp) {
|
||||||
// Careful! Locking order is important!
|
// Careful! Locking order is important!
|
||||||
writeLock.lock();
|
writeLock.lock();
|
||||||
try {
|
try {
|
||||||
|
@ -592,7 +598,7 @@ public class LeafQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We don't want to update metrics for move app
|
// We don't want to update metrics for move app
|
||||||
if (application.isPending()) {
|
if (!isMoveApp) {
|
||||||
metrics.submitAppAttempt(userName);
|
metrics.submitAppAttempt(userName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -478,6 +478,13 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
// submit attempt logic.
|
// submit attempt logic.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void submitApplicationAttempt(FiCaSchedulerApp application,
|
||||||
|
String userName, boolean isMoveApp) {
|
||||||
|
throw new UnsupportedOperationException("Submission of application attempt"
|
||||||
|
+ " to parent queue is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void finishApplicationAttempt(FiCaSchedulerApp application,
|
public void finishApplicationAttempt(FiCaSchedulerApp application,
|
||||||
String queue) {
|
String queue) {
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
||||||
|
@ -222,6 +223,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (resourceManager != null) {
|
if (resourceManager != null) {
|
||||||
|
QueueMetrics.clearQueueMetrics();
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
resourceManager.stop();
|
resourceManager.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1859,13 +1862,13 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
MockRM rm = setUpMove();
|
MockRM rm = setUpMove();
|
||||||
AbstractYarnScheduler scheduler =
|
AbstractYarnScheduler scheduler =
|
||||||
(AbstractYarnScheduler) rm.getResourceScheduler();
|
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||||
|
QueueMetrics metrics = scheduler.getRootQueueMetrics();
|
||||||
|
Assert.assertEquals(0, metrics.getAppsPending());
|
||||||
// submit an app
|
// submit an app
|
||||||
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||||
ApplicationAttemptId appAttemptId =
|
ApplicationAttemptId appAttemptId =
|
||||||
rm.getApplicationReport(app.getApplicationId())
|
rm.getApplicationReport(app.getApplicationId())
|
||||||
.getCurrentApplicationAttemptId();
|
.getCurrentApplicationAttemptId();
|
||||||
|
|
||||||
// check preconditions
|
// check preconditions
|
||||||
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
assertEquals(1, appsInA1.size());
|
assertEquals(1, appsInA1.size());
|
||||||
|
@ -1882,6 +1885,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
assertTrue(appsInRoot.contains(appAttemptId));
|
assertTrue(appsInRoot.contains(appAttemptId));
|
||||||
assertEquals(1, appsInRoot.size());
|
assertEquals(1, appsInRoot.size());
|
||||||
|
|
||||||
|
assertEquals(1, metrics.getAppsPending());
|
||||||
|
|
||||||
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
assertTrue(appsInB1.isEmpty());
|
assertTrue(appsInB1.isEmpty());
|
||||||
|
|
||||||
|
@ -1907,6 +1912,8 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
assertTrue(appsInRoot.contains(appAttemptId));
|
assertTrue(appsInRoot.contains(appAttemptId));
|
||||||
assertEquals(1, appsInRoot.size());
|
assertEquals(1, appsInRoot.size());
|
||||||
|
|
||||||
|
assertEquals(1, metrics.getAppsPending());
|
||||||
|
|
||||||
appsInA1 = scheduler.getAppsInQueue("a1");
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
assertTrue(appsInA1.isEmpty());
|
assertTrue(appsInA1.isEmpty());
|
||||||
|
|
||||||
|
@ -1916,6 +1923,67 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoveAppPendingMetrics() throws Exception {
|
||||||
|
MockRM rm = setUpMove();
|
||||||
|
AbstractYarnScheduler scheduler =
|
||||||
|
(AbstractYarnScheduler) rm.getResourceScheduler();
|
||||||
|
QueueMetrics metrics = scheduler.getRootQueueMetrics();
|
||||||
|
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
|
||||||
|
assertEquals(0, appsInA1.size());
|
||||||
|
assertEquals(0, appsInB1.size());
|
||||||
|
Assert.assertEquals(0, metrics.getAppsPending());
|
||||||
|
|
||||||
|
// submit two apps in a1
|
||||||
|
RMApp app1 = rm.submitApp(GB, "test-move-1", "user_0", null, "a1");
|
||||||
|
RMApp app2 = rm.submitApp(GB, "test-move-2", "user_0", null, "a1");
|
||||||
|
|
||||||
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(2, appsInA1.size());
|
||||||
|
assertEquals(0, appsInB1.size());
|
||||||
|
assertEquals(2, metrics.getAppsPending());
|
||||||
|
|
||||||
|
// submit one app in b1
|
||||||
|
RMApp app3 = rm.submitApp(GB, "test-move-2", "user_0", null, "b1");
|
||||||
|
|
||||||
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(2, appsInA1.size());
|
||||||
|
assertEquals(1, appsInB1.size());
|
||||||
|
assertEquals(3, metrics.getAppsPending());
|
||||||
|
|
||||||
|
// now move the app1 from a1 to b1
|
||||||
|
scheduler.moveApplication(app1.getApplicationId(), "b1");
|
||||||
|
|
||||||
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(1, appsInA1.size());
|
||||||
|
assertEquals(2, appsInB1.size());
|
||||||
|
assertEquals(3, metrics.getAppsPending());
|
||||||
|
|
||||||
|
// now move the app2 from a1 to b1
|
||||||
|
scheduler.moveApplication(app2.getApplicationId(), "b1");
|
||||||
|
|
||||||
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(0, appsInA1.size());
|
||||||
|
assertEquals(3, appsInB1.size());
|
||||||
|
assertEquals(3, metrics.getAppsPending());
|
||||||
|
|
||||||
|
// now move the app3 from b1 to a1
|
||||||
|
scheduler.moveApplication(app3.getApplicationId(), "a1");
|
||||||
|
|
||||||
|
appsInA1 = scheduler.getAppsInQueue("a1");
|
||||||
|
appsInB1 = scheduler.getAppsInQueue("b1");
|
||||||
|
assertEquals(1, appsInA1.size());
|
||||||
|
assertEquals(2, appsInB1.size());
|
||||||
|
assertEquals(3, metrics.getAppsPending());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMoveAppSameParent() throws Exception {
|
public void testMoveAppSameParent() throws Exception {
|
||||||
MockRM rm = setUpMove();
|
MockRM rm = setUpMove();
|
||||||
|
|
Loading…
Reference in New Issue