YARN-8541. RM startup failure on recovery after user deletion. Contributed by Bibin A Chundatt.
(cherry picked from commit 8e65057eb10d03db08781da7a5ad8855155883ed)
This commit is contained in:
parent
23624c9248
commit
37f25fbd08
@ -364,17 +364,9 @@ private RMAppImpl createAndPopulateNewRMApp(
|
|||||||
ApplicationSubmissionContext submissionContext, long submitTime,
|
ApplicationSubmissionContext submissionContext, long submitTime,
|
||||||
String user, boolean isRecovery, long startTime) throws YarnException {
|
String user, boolean isRecovery, long startTime) throws YarnException {
|
||||||
|
|
||||||
ApplicationPlacementContext placementContext = null;
|
ApplicationPlacementContext placementContext =
|
||||||
try {
|
placeApplication(rmContext.getQueuePlacementManager(),
|
||||||
placementContext = placeApplication(rmContext, submissionContext, user);
|
submissionContext, user, isRecovery);
|
||||||
} catch (YarnException e) {
|
|
||||||
String msg =
|
|
||||||
"Failed to place application " + submissionContext.getApplicationId()
|
|
||||||
+ " to queue and specified " + "queue is invalid : "
|
|
||||||
+ submissionContext.getQueue();
|
|
||||||
LOG.error(msg, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We only replace the queue when it's a new application
|
// We only replace the queue when it's a new application
|
||||||
if (!isRecovery) {
|
if (!isRecovery) {
|
||||||
@ -789,23 +781,31 @@ private void updateAppDataToStateStore(String queue, RMApp app,
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
ApplicationPlacementContext placeApplication(RMContext rmContext,
|
ApplicationPlacementContext placeApplication(
|
||||||
ApplicationSubmissionContext context, String user) throws YarnException {
|
PlacementManager placementManager, ApplicationSubmissionContext context,
|
||||||
|
String user, boolean isRecovery) throws YarnException {
|
||||||
ApplicationPlacementContext placementContext = null;
|
ApplicationPlacementContext placementContext = null;
|
||||||
PlacementManager placementManager = rmContext.getQueuePlacementManager();
|
|
||||||
|
|
||||||
if (placementManager != null) {
|
if (placementManager != null) {
|
||||||
placementContext = placementManager.placeApplication(context, user);
|
try {
|
||||||
} else{
|
placementContext = placementManager.placeApplication(context, user);
|
||||||
if ( context.getQueue() == null || context.getQueue().isEmpty()) {
|
} catch (YarnException e) {
|
||||||
final String msg = "Queue Placement Manager is not set. Cannot place "
|
// Placement could also fail if the user doesn't exist in system
|
||||||
+ "application : " + context.getApplicationId() + " to queue and "
|
// skip if the user is not found during recovery.
|
||||||
+ "specified queue is invalid " + context.getQueue();
|
if (isRecovery) {
|
||||||
LOG.error(msg);
|
LOG.warn("PlaceApplication failed,skipping on recovery of rm");
|
||||||
throw new YarnException(msg);
|
return placementContext;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (placementContext == null && (context.getQueue() == null) || context
|
||||||
|
.getQueue().isEmpty()) {
|
||||||
|
String msg = "Failed to place application " + context.getApplicationId()
|
||||||
|
+ " to queue and specified " + "queue is invalid : " + context
|
||||||
|
.getQueue();
|
||||||
|
LOG.error(msg);
|
||||||
|
throw new YarnException(msg);
|
||||||
|
}
|
||||||
return placementContext;
|
return placementContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -70,15 +70,6 @@ public ApplicationPlacementContext placeApplication(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Failed to get where to place application
|
|
||||||
if (null == placement && null == asc.getQueue()) {
|
|
||||||
String msg = "Failed to place application " +
|
|
||||||
asc.getApplicationId() + " to queue and specified "
|
|
||||||
+ "queue is invalid : " + asc.getQueue();
|
|
||||||
LOG.error(msg);
|
|
||||||
throw new YarnException(msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
return placement;
|
return placement;
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
@ -39,8 +39,12 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||||
|
.ApplicationPlacementContext;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
@ -105,6 +109,8 @@
|
|||||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp
|
import static org.apache.hadoop.yarn.server.resourcemanager.webapp
|
||||||
.RMWebServices.DEFAULT_QUEUE;
|
.RMWebServices.DEFAULT_QUEUE;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
@ -1554,6 +1560,48 @@ public void testUAMRecoveryOnRMWorkPreservingRestart() throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testUnknownUserOnRecovery() throws Exception {
|
||||||
|
|
||||||
|
MockRM rm1 = new MockRM(conf);
|
||||||
|
rm1.start();
|
||||||
|
MockMemoryRMStateStore memStore =
|
||||||
|
(MockMemoryRMStateStore) rm1.getRMStateStore();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
// create app and launch the UAM
|
||||||
|
RMApp app0 = rm1.submitApp(200, true);
|
||||||
|
MockAM am0 = MockRM.launchUAM(app0, rm1, nm1);
|
||||||
|
am0.registerAppAttempt();
|
||||||
|
rm1.killApp(app0.getApplicationId());
|
||||||
|
PlacementManager placementMgr = mock(PlacementManager.class);
|
||||||
|
doThrow(new YarnException("No groups for user")).when(placementMgr)
|
||||||
|
.placeApplication(any(ApplicationSubmissionContext.class),
|
||||||
|
any(String.class));
|
||||||
|
MockRM rm2 = new MockRM(conf, memStore) {
|
||||||
|
@Override
|
||||||
|
protected RMAppManager createRMAppManager() {
|
||||||
|
return new RMAppManager(this.rmContext, this.scheduler,
|
||||||
|
this.masterService, this.applicationACLsManager, conf) {
|
||||||
|
@Override
|
||||||
|
ApplicationPlacementContext placeApplication(
|
||||||
|
PlacementManager placementManager,
|
||||||
|
ApplicationSubmissionContext context, String user,
|
||||||
|
boolean isRecovery) throws YarnException {
|
||||||
|
return super
|
||||||
|
.placeApplication(placementMgr, context, user, isRecovery);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm2.start();
|
||||||
|
RMApp recoveredApp =
|
||||||
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
||||||
|
Assert.assertEquals(RMAppState.KILLED, recoveredApp.getState());
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 30000)
|
@Test(timeout = 30000)
|
||||||
public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
|
public void testDynamicAutoCreatedQueueRecoveryWithDefaultQueue()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user