YARN-7382. NoSuchElementException in FairScheduler after failover causes RM crash (rkanter)
(cherry picked from commit 025c656572
)
This commit is contained in:
parent
3d36f75f23
commit
1d34a4805e
|
@ -665,6 +665,16 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
||||||
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
if (!rmContainer.getState().equals(RMContainerState.COMPLETED)) {
|
||||||
getQueue().incUsedResource(rmContainer.getContainer().getResource());
|
getQueue().incUsedResource(rmContainer.getContainer().getResource());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If not running unmanaged, the first container we recover is always
|
||||||
|
// the AM. Set the amResource for this app and update the leaf queue's AM
|
||||||
|
// usage
|
||||||
|
if (!isAmRunning() && !getUnmanagedAM()) {
|
||||||
|
Resource resource = rmContainer.getAllocatedResource();
|
||||||
|
setAMResource(resource);
|
||||||
|
getQueue().addAMResourceUsage(resource);
|
||||||
|
setAmRunning(true);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueu
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase;
|
||||||
|
@ -154,6 +155,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
nm1.registerNode();
|
nm1.registerNode();
|
||||||
RMApp app1 = rm1.submitApp(200);
|
RMApp app1 = rm1.submitApp(200);
|
||||||
|
Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
// clear queue metrics
|
// clear queue metrics
|
||||||
|
@ -236,7 +238,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
||||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||||
} else {
|
} else {
|
||||||
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
checkFSQueue(rm2, schedulerApp, usedResources, availableResources,
|
||||||
|
amResources);
|
||||||
}
|
}
|
||||||
|
|
||||||
// *********** check scheduler attempt state.********
|
// *********** check scheduler attempt state.********
|
||||||
|
@ -306,6 +309,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
|
RMApp app1 = rm1.submitApp(200, "dynamicQApp",
|
||||||
UserGroupInformation.getCurrentUser().getShortUserName(), null,
|
UserGroupInformation.getCurrentUser().getShortUserName(), null,
|
||||||
ReservationSystemTestUtil.getReservationQueueName());
|
ReservationSystemTestUtil.getReservationQueueName());
|
||||||
|
Resource amResources = app1.getAMResourceRequests().get(0).getCapability();
|
||||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
||||||
|
|
||||||
// clear queue metrics
|
// clear queue metrics
|
||||||
|
@ -380,7 +384,8 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
||||||
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
checkCSQueue(rm2, schedulerApp, nmResource, nmResource, usedResources, 2);
|
||||||
} else {
|
} else {
|
||||||
checkFSQueue(rm2, schedulerApp, usedResources, availableResources);
|
checkFSQueue(rm2, schedulerApp, usedResources, availableResources,
|
||||||
|
amResources);
|
||||||
}
|
}
|
||||||
|
|
||||||
// *********** check scheduler attempt state.********
|
// *********** check scheduler attempt state.********
|
||||||
|
@ -452,7 +457,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
|
|
||||||
private void checkFSQueue(ResourceManager rm,
|
private void checkFSQueue(ResourceManager rm,
|
||||||
SchedulerApplication schedulerApp, Resource usedResources,
|
SchedulerApplication schedulerApp, Resource usedResources,
|
||||||
Resource availableResources) throws Exception {
|
Resource availableResources, Resource amResources) throws Exception {
|
||||||
// waiting for RM's scheduling apps
|
// waiting for RM's scheduling apps
|
||||||
int retry = 0;
|
int retry = 0;
|
||||||
Resource assumedFairShare = Resource.newInstance(8192, 8);
|
Resource assumedFairShare = Resource.newInstance(8192, 8);
|
||||||
|
@ -484,6 +489,16 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
||||||
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
|
assertMetrics(queueMetrics, 1, 0, 1, 0, 2, availableResources.getMemorySize(),
|
||||||
availableResources.getVirtualCores(), usedResources.getMemorySize(),
|
availableResources.getVirtualCores(), usedResources.getMemorySize(),
|
||||||
usedResources.getVirtualCores());
|
usedResources.getVirtualCores());
|
||||||
|
|
||||||
|
// ************ check AM resources ****************
|
||||||
|
assertEquals(amResources,
|
||||||
|
schedulerApp.getCurrentAppAttempt().getAMResource());
|
||||||
|
FSQueueMetrics fsQueueMetrics =
|
||||||
|
(FSQueueMetrics) schedulerApp.getQueue().getMetrics();
|
||||||
|
assertEquals(amResources.getMemorySize(),
|
||||||
|
fsQueueMetrics.getAMResourceUsageMB());
|
||||||
|
assertEquals(amResources.getVirtualCores(),
|
||||||
|
fsQueueMetrics.getAMResourceUsageVCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
// create 3 container reports for AM
|
// create 3 container reports for AM
|
||||||
|
|
Loading…
Reference in New Issue