diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9dbb6d2b5fe..86b5336d503 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -196,6 +196,9 @@ Release 0.23.5 - UNRELEASED YARN-166. capacity scheduler doesn't allow capacity < 1.0 (tgraves via bobby) + YARN-189. Fixed a deadlock between RM's ApplicationMasterService and the + dispatcher. (Thomas Graves via vinodkv) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 849111e54b5..96ee551e205 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -265,10 +265,10 @@ public class ApplicationMasterService extends AbstractService implements // Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO: allocateResponse.setAMResponse(reboot); return allocateResponse; - } - + } + // Allow only one thread in AM to do heartbeat at a time. - synchronized (lastResponse) { // BUG TODO: Locking order is screwed. + synchronized (lastResponse) { // Send the status update to the appAttempt. this.rmContext.getDispatcher().getEventHandler().handle( @@ -282,7 +282,8 @@ public class ApplicationMasterService extends AbstractService implements Allocation allocation = this.rScheduler.allocate(appAttemptId, ask, release); - RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId()); + RMApp app = this.rmContext.getRMApps().get( + appAttemptId.getApplicationId()); RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId); AMResponse response = recordFactory.newRecordInstance(AMResponse.class); @@ -316,7 +317,18 @@ public class ApplicationMasterService extends AbstractService implements .pullJustFinishedContainers()); response.setResponseId(lastResponse.getResponseId() + 1); response.setAvailableResources(allocation.getResourceLimit()); - responseMap.put(appAttemptId, response); + + AMResponse oldResponse = responseMap.put(appAttemptId, response); + if (oldResponse == null) { + // appAttempt got unregistered, remove it back out + responseMap.remove(appAttemptId); + String message = "App Attempt removed from the cache during allocate" + + appAttemptId; + LOG.error(message); + allocateResponse.setAMResponse(reboot); + return allocateResponse; + } + allocateResponse.setAMResponse(response); allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes()); return allocateResponse; @@ -331,12 +343,7 @@ public class ApplicationMasterService extends AbstractService implements } public void unregisterAttempt(ApplicationAttemptId attemptId) { - AMResponse lastResponse = responseMap.get(attemptId); - if (lastResponse != null) { - synchronized (lastResponse) { - responseMap.remove(attemptId); - } - } + responseMap.remove(attemptId); } public void refreshServiceAcls(Configuration configuration,