YARN-8541. RM startup failure on recovery after user deletion. Contributed by Bibin A Chundatt.

This commit is contained in:
bibinchundatt 2018-07-25 15:54:32 +05:30
parent b89624a943
commit 8e65057eb1
3 changed files with 72 additions and 33 deletions

View File

@ -364,17 +364,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery, long startTime) throws YarnException {
ApplicationPlacementContext placementContext = null;
try {
placementContext = placeApplication(rmContext, submissionContext, user);
} catch (YarnException e) {
String msg =
"Failed to place application " + submissionContext.getApplicationId()
+ " to queue and specified " + "queue is invalid : "
+ submissionContext.getQueue();
LOG.error(msg, e);
throw e;
}
ApplicationPlacementContext placementContext =
placeApplication(rmContext.getQueuePlacementManager(),
submissionContext, user, isRecovery);
// We only replace the queue when it's a new application
if (!isRecovery) {
@ -789,23 +781,31 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
}
@VisibleForTesting
ApplicationPlacementContext placeApplication(RMContext rmContext,
ApplicationSubmissionContext context, String user) throws YarnException {
ApplicationPlacementContext placeApplication(
PlacementManager placementManager, ApplicationSubmissionContext context,
String user, boolean isRecovery) throws YarnException {
ApplicationPlacementContext placementContext = null;
PlacementManager placementManager = rmContext.getQueuePlacementManager();
if (placementManager != null) {
placementContext = placementManager.placeApplication(context, user);
} else{
if ( context.getQueue() == null || context.getQueue().isEmpty()) {
final String msg = "Queue Placement Manager is not set. Cannot place "
+ "application : " + context.getApplicationId() + " to queue and "
+ "specified queue is invalid " + context.getQueue();
LOG.error(msg);
throw new YarnException(msg);
try {
placementContext = placementManager.placeApplication(context, user);
} catch (YarnException e) {
// Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery.
if (isRecovery) {
LOG.warn("PlaceApplication failed,skipping on recovery of rm");
return placementContext;
}
throw e;
}
}
if (placementContext == null && (context.getQueue() == null) || context
.getQueue().isEmpty()) {
String msg = "Failed to place application " + context.getApplicationId()
+ " to queue and specified " + "queue is invalid : " + context
.getQueue();
LOG.error(msg);
throw new YarnException(msg);
}
return placementContext;
}

View File

@ -70,15 +70,6 @@ public class PlacementManager {
}
}
// Failed to get where to place application
if (null == placement && null == asc.getQueue()) {
String msg = "Failed to place application " +
asc.getApplicationId() + " to queue and specified "
+ "queue is invalid : " + asc.getQueue();
LOG.error(msg);
throw new YarnException(msg);
}
return placement;
} finally {
readLock.unlock();

View File

@ -39,8 +39,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
import org.apache.hadoop.yarn.server.resourcemanager.placement
.ApplicationPlacementContext;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
@ -105,6 +109,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
import static org.apache.hadoop.yarn.server.resourcemanager.webapp
.RMWebServices.DEFAULT_QUEUE;
import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -1554,6 +1560,48 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
}
@Test(timeout = 30000)
public void testUnknownUserOnRecovery() throws Exception {
MockRM rm1 = new MockRM(conf);
rm1.start();
MockMemoryRMStateStore memStore =
(MockMemoryRMStateStore) rm1.getRMStateStore();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();
// create app and launch the UAM
RMApp app0 = rm1.submitApp(200, true);
MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
am0.registerAppAttempt();
rm1.killApp(app0.getApplicationId());
PlacementManager placementMgr = mock(PlacementManager.class);
doThrow(new YarnException("No groups for user")).when(placementMgr)
.placeApplication(any(ApplicationSubmissionContext.class),
any(String.class));
MockRM rm2 = new MockRM(conf, memStore) {
@Override
protected RMAppManager createRMAppManager() {
return new RMAppManager(this.rmContext, this.scheduler,
this.masterService, this.applicationACLsManager, conf) {
@Override
ApplicationPlacementContext placeApplication(
PlacementManager placementManager,
ApplicationSubmissionContext context, String user,
boolean isRecovery) throws YarnException {
return super
.placeApplication(placementMgr, context, user, isRecovery);
}
};
}
};
rm2.start();
RMApp recoveredApp =
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
Assert.assertEquals(RMAppState.KILLED, recoveredApp.getState());
}
@Test(timeout = 30000)
public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
throws Exception {