YARN-8541. RM startup failure on recovery after user deletion. Contributed by Bibin A Chundatt.
This commit is contained in:
parent
cd0b9f1380
commit
e673dd1d4d
|
@ -364,17 +364,9 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||
String user, boolean isRecovery, long startTime) throws YarnException {
|
||||
|
||||
ApplicationPlacementContext placementContext = null;
|
||||
try {
|
||||
placementContext = placeApplication(rmContext, submissionContext, user);
|
||||
} catch (YarnException e) {
|
||||
String msg =
|
||||
"Failed to place application " + submissionContext.getApplicationId()
|
||||
+ " to queue and specified " + "queue is invalid : "
|
||||
+ submissionContext.getQueue();
|
||||
LOG.error(msg, e);
|
||||
throw e;
|
||||
}
|
||||
ApplicationPlacementContext placementContext =
|
||||
placeApplication(rmContext.getQueuePlacementManager(),
|
||||
submissionContext, user, isRecovery);
|
||||
|
||||
// We only replace the queue when it's a new application
|
||||
if (!isRecovery) {
|
||||
|
@ -789,23 +781,31 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ApplicationPlacementContext placeApplication(RMContext rmContext,
|
||||
ApplicationSubmissionContext context, String user) throws YarnException {
|
||||
ApplicationPlacementContext placeApplication(
|
||||
PlacementManager placementManager, ApplicationSubmissionContext context,
|
||||
String user, boolean isRecovery) throws YarnException {
|
||||
ApplicationPlacementContext placementContext = null;
|
||||
PlacementManager placementManager = rmContext.getQueuePlacementManager();
|
||||
|
||||
if (placementManager != null) {
|
||||
try {
|
||||
placementContext = placementManager.placeApplication(context, user);
|
||||
} else{
|
||||
if ( context.getQueue() == null || context.getQueue().isEmpty()) {
|
||||
final String msg = "Queue Placement Manager is not set. Cannot place "
|
||||
+ "application : " + context.getApplicationId() + " to queue and "
|
||||
+ "specified queue is invalid " + context.getQueue();
|
||||
} catch (YarnException e) {
|
||||
// Placement could also fail if the user doesn't exist in system
|
||||
// skip if the user is not found during recovery.
|
||||
if (isRecovery) {
|
||||
LOG.warn("PlaceApplication failed,skipping on recovery of rm");
|
||||
return placementContext;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (placementContext == null && (context.getQueue() == null) || context
|
||||
.getQueue().isEmpty()) {
|
||||
String msg = "Failed to place application " + context.getApplicationId()
|
||||
+ " to queue and specified " + "queue is invalid : " + context
|
||||
.getQueue();
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return placementContext;
|
||||
}
|
||||
|
||||
|
|
|
@ -70,15 +70,6 @@ public class PlacementManager {
|
|||
}
|
||||
}
|
||||
|
||||
// Failed to get where to place application
|
||||
if (null == placement && null == asc.getQueue()) {
|
||||
String msg = "Failed to place application " +
|
||||
asc.getApplicationId() + " to queue and specified "
|
||||
+ "queue is invalid : " + asc.getQueue();
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
|
||||
return placement;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
|
|
|
@ -39,8 +39,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||
.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
|
@ -105,6 +109,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp
|
||||
.RMWebServices.DEFAULT_QUEUE;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -1554,6 +1560,48 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
|
|||
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testUnknownUserOnRecovery() throws Exception {
|
||||
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockMemoryRMStateStore memStore =
|
||||
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// create app and launch the UAM
|
||||
RMApp app0 = rm1.submitApp(200, true);
|
||||
MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
|
||||
am0.registerAppAttempt();
|
||||
rm1.killApp(app0.getApplicationId());
|
||||
PlacementManager placementMgr = mock(PlacementManager.class);
|
||||
doThrow(new YarnException("No groups for user")).when(placementMgr)
|
||||
.placeApplication(any(ApplicationSubmissionContext.class),
|
||||
any(String.class));
|
||||
MockRM rm2 = new MockRM(conf, memStore) {
|
||||
@Override
|
||||
protected RMAppManager createRMAppManager() {
|
||||
return new RMAppManager(this.rmContext, this.scheduler,
|
||||
this.masterService, this.applicationACLsManager, conf) {
|
||||
@Override
|
||||
ApplicationPlacementContext placeApplication(
|
||||
PlacementManager placementManager,
|
||||
ApplicationSubmissionContext context, String user,
|
||||
boolean isRecovery) throws YarnException {
|
||||
return super
|
||||
.placeApplication(placementMgr, context, user, isRecovery);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
rm2.start();
|
||||
RMApp recoveredApp =
|
||||
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||
Assert.assertEquals(RMAppState.KILLED, recoveredApp.getState());
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
|
||||
throws Exception {
|
||||
|
|
|
@ -83,16 +83,11 @@ public class TestPlacementManager {
|
|||
|
||||
ApplicationSubmissionContext asc = Records.newRecord(
|
||||
ApplicationSubmissionContext.class);
|
||||
asc.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||
asc.setApplicationName(APP_NAME);
|
||||
|
||||
boolean caughtException = false;
|
||||
try{
|
||||
pm.placeApplication(asc, USER2);
|
||||
} catch (Exception e) {
|
||||
caughtException = true;
|
||||
}
|
||||
Assert.assertTrue(caughtException);
|
||||
|
||||
Assert.assertNull("Placement should be null",
|
||||
pm.placeApplication(asc, USER2));
|
||||
QueueMappingEntity queueMappingEntity = new QueueMappingEntity(APP_NAME,
|
||||
USER1, PARENT_QUEUE);
|
||||
|
||||
|
@ -101,11 +96,12 @@ public class TestPlacementManager {
|
|||
queuePlacementRules.add(anRule);
|
||||
pm.updateRules(queuePlacementRules);
|
||||
try {
|
||||
pm.placeApplication(asc, USER2);
|
||||
ApplicationPlacementContext pc = pm.placeApplication(asc, USER2);
|
||||
Assert.assertNotNull(pc);
|
||||
} catch (Exception e) {
|
||||
caughtException = false;
|
||||
e.printStackTrace();
|
||||
Assert.fail("Exception not expected");
|
||||
}
|
||||
Assert.assertFalse(caughtException);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue