YARN-189. Fixed a deadlock between RM's ApplicationMasterService and the dispatcher. Contributed by Thomas Graves.
svn merge --ignore-ancestry -c 1404431 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1404432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9d689a9db9
commit
4722d76b4c
|
@ -179,6 +179,9 @@ Release 0.23.5 - UNRELEASED
|
||||||
YARN-166. capacity scheduler doesn't allow capacity < 1.0 (tgraves via
|
YARN-166. capacity scheduler doesn't allow capacity < 1.0 (tgraves via
|
||||||
bobby)
|
bobby)
|
||||||
|
|
||||||
|
YARN-189. Fixed a deadlock between RM's ApplicationMasterService and the
|
||||||
|
dispatcher. (Thomas Graves via vinodkv)
|
||||||
|
|
||||||
Release 0.23.4 - UNRELEASED
|
Release 0.23.4 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -268,7 +268,7 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
// Allow only one thread in AM to do heartbeat at a time.
|
// Allow only one thread in AM to do heartbeat at a time.
|
||||||
synchronized (lastResponse) { // BUG TODO: Locking order is screwed.
|
synchronized (lastResponse) {
|
||||||
|
|
||||||
// Send the status update to the appAttempt.
|
// Send the status update to the appAttempt.
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
@ -282,7 +282,8 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
Allocation allocation =
|
Allocation allocation =
|
||||||
this.rScheduler.allocate(appAttemptId, ask, release);
|
this.rScheduler.allocate(appAttemptId, ask, release);
|
||||||
|
|
||||||
RMApp app = this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
RMApp app = this.rmContext.getRMApps().get(
|
||||||
|
appAttemptId.getApplicationId());
|
||||||
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
||||||
|
|
||||||
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
|
AMResponse response = recordFactory.newRecordInstance(AMResponse.class);
|
||||||
|
@ -316,7 +317,18 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
.pullJustFinishedContainers());
|
.pullJustFinishedContainers());
|
||||||
response.setResponseId(lastResponse.getResponseId() + 1);
|
response.setResponseId(lastResponse.getResponseId() + 1);
|
||||||
response.setAvailableResources(allocation.getResourceLimit());
|
response.setAvailableResources(allocation.getResourceLimit());
|
||||||
responseMap.put(appAttemptId, response);
|
|
||||||
|
AMResponse oldResponse = responseMap.put(appAttemptId, response);
|
||||||
|
if (oldResponse == null) {
|
||||||
|
// appAttempt got unregistered, remove it back out
|
||||||
|
responseMap.remove(appAttemptId);
|
||||||
|
String message = "App Attempt removed from the cache during allocate"
|
||||||
|
+ appAttemptId;
|
||||||
|
LOG.error(message);
|
||||||
|
allocateResponse.setAMResponse(reboot);
|
||||||
|
return allocateResponse;
|
||||||
|
}
|
||||||
|
|
||||||
allocateResponse.setAMResponse(response);
|
allocateResponse.setAMResponse(response);
|
||||||
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
|
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
|
||||||
return allocateResponse;
|
return allocateResponse;
|
||||||
|
@ -331,13 +343,8 @@ public class ApplicationMasterService extends AbstractService implements
|
||||||
}
|
}
|
||||||
|
|
||||||
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
||||||
AMResponse lastResponse = responseMap.get(attemptId);
|
|
||||||
if (lastResponse != null) {
|
|
||||||
synchronized (lastResponse) {
|
|
||||||
responseMap.remove(attemptId);
|
responseMap.remove(attemptId);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void refreshServiceAcls(Configuration configuration,
|
public void refreshServiceAcls(Configuration configuration,
|
||||||
PolicyProvider policyProvider) {
|
PolicyProvider policyProvider) {
|
||||||
|
|
Loading…
Reference in New Issue