Merge -r 1166967:1166968 from trunk to branch-0.23 to fix MAPREDUCE-2953.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1166969 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-09 02:20:03 +00:00
parent 778e65aa74
commit 1b316b7707
5 changed files with 22 additions and 16 deletions

View File

@ -1234,6 +1234,10 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2937. Ensure reason for application failure is displayed to the MAPREDUCE-2937. Ensure reason for application failure is displayed to the
user. (mahadev via acmurthy) user. (mahadev via acmurthy)
MAPREDUCE-2953. Fix a race condition on submission which caused client to
incorrectly assume application was gone by making submission synchronous
for RMAppManager. (Thomas Graves via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -91,6 +91,7 @@ public class ClientRMService extends AbstractService implements
final private YarnScheduler scheduler; final private YarnScheduler scheduler;
final private RMContext rmContext; final private RMContext rmContext;
private final AMLivelinessMonitor amLivelinessMonitor; private final AMLivelinessMonitor amLivelinessMonitor;
private final RMAppManager rmAppManager;
private String clientServiceBindAddress; private String clientServiceBindAddress;
private Server server; private Server server;
@ -100,11 +101,13 @@ public class ClientRMService extends AbstractService implements
private ApplicationACLsManager aclsManager; private ApplicationACLsManager aclsManager;
private Map<ApplicationACL, AccessControlList> applicationACLs; private Map<ApplicationACL, AccessControlList> applicationACLs;
public ClientRMService(RMContext rmContext, YarnScheduler scheduler) { public ClientRMService(RMContext rmContext, YarnScheduler scheduler,
RMAppManager rmAppManager) {
super(ClientRMService.class.getName()); super(ClientRMService.class.getName());
this.scheduler = scheduler; this.scheduler = scheduler;
this.rmContext = rmContext; this.rmContext = rmContext;
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor(); this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
this.rmAppManager = rmAppManager;
} }
@Override @Override
@ -201,8 +204,10 @@ public class ClientRMService extends AbstractService implements
throw new IOException("Application with id " + applicationId throw new IOException("Application with id " + applicationId
+ " is already present! Cannot add a duplicate!"); + " is already present! Cannot add a duplicate!");
} }
this.rmContext.getDispatcher().getEventHandler().handle( // This needs to be synchronous as the client can query
new RMAppManagerSubmitEvent(submissionContext)); // immediately following the submission to get the application status.
// So call handle directly and do not send an event.
rmAppManager.handle(new RMAppManagerSubmitEvent(submissionContext));
LOG.info("Application with id " + applicationId.getId() + LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user + " with " + submissionContext); " submitted by user " + user + " with " + submissionContext);

View File

@ -210,7 +210,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent> {
} }
} }
protected void submitApplication(ApplicationSubmissionContext submissionContext) { protected synchronized void submitApplication(ApplicationSubmissionContext submissionContext) {
ApplicationId applicationId = submissionContext.getApplicationId(); ApplicationId applicationId = submissionContext.getApplicationId();
RMApp application = null; RMApp application = null;
try { try {

View File

@ -99,7 +99,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected NMLivelinessMonitor nmLivelinessMonitor; protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager; protected NodesListManager nodesListManager;
private SchedulerEventDispatcher schedulerDispatcher; private SchedulerEventDispatcher schedulerDispatcher;
private RMAppManager rmAppManager; protected RMAppManager rmAppManager;
private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shutdown = new AtomicBoolean(false);
private WebApp webApp; private WebApp webApp;
@ -176,13 +176,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
masterService = createApplicationMasterService(); masterService = createApplicationMasterService();
addService(masterService) ; addService(masterService) ;
clientRM = createClientRMService();
addService(clientRM);
this.rmAppManager = createRMAppManager(); this.rmAppManager = createRMAppManager();
// Register event handler for RMAppManagerEvents // Register event handler for RMAppManagerEvents
this.rmDispatcher.register(RMAppManagerEventType.class, this.rmDispatcher.register(RMAppManagerEventType.class,
this.rmAppManager); this.rmAppManager);
clientRM = createClientRMService();
addService(clientRM);
adminService = createAdminService(); adminService = createAdminService();
addService(adminService); addService(adminService);
@ -441,7 +441,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
} }
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(this.rmContext, scheduler); return new ClientRMService(this.rmContext, scheduler, this.rmAppManager);
} }
protected ApplicationMasterService createApplicationMasterService() { protected ApplicationMasterService createApplicationMasterService() {

View File

@ -60,13 +60,9 @@ public class MockRM extends ResourceManager {
public void waitForState(ApplicationId appId, RMAppState finalState) public void waitForState(ApplicationId appId, RMAppState finalState)
throws Exception { throws Exception {
RMApp app = getRMContext().getRMApps().get(appId);
Assert.assertNotNull("app shouldn't be null", app);
int timeoutSecs = 0; int timeoutSecs = 0;
RMApp app = null;
while ((app == null) && timeoutSecs++ < 20) {
app = getRMContext().getRMApps().get(appId);
Thread.sleep(500);
}
timeoutSecs = 0;
while (!finalState.equals(app.getState()) && while (!finalState.equals(app.getState()) &&
timeoutSecs++ < 20) { timeoutSecs++ < 20) {
System.out.println("App State is : " + app.getState() + System.out.println("App State is : " + app.getState() +
@ -95,6 +91,7 @@ public class MockRM extends ResourceManager {
req.setApplicationSubmissionContext(sub); req.setApplicationSubmissionContext(sub);
client.submitApplication(req); client.submitApplication(req);
// make sure app is immediately available after submit
waitForState(appId, RMAppState.ACCEPTED); waitForState(appId, RMAppState.ACCEPTED);
return getRMContext().getRMApps().get(appId); return getRMContext().getRMApps().get(appId);
} }
@ -131,7 +128,7 @@ public class MockRM extends ResourceManager {
@Override @Override
protected ClientRMService createClientRMService() { protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler()) { return new ClientRMService(getRMContext(), getResourceScheduler(), rmAppManager) {
@Override @Override
public void start() { public void start() {
//override to not start rpc handler //override to not start rpc handler