YARN-744. Race condition in ApplicationMasterService.allocate .. It might process same allocate request twice resulting in additional containers getting allocated. (Omkar Vinit Joshi via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1543707 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-11-20 05:06:15 +00:00
parent 22b5352b40
commit 8caae1d5a6
2 changed files with 97 additions and 79 deletions

View File

@ -156,6 +156,10 @@ Release 2.3.0 - UNRELEASED
YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
under jdk7 (Jonathan Eagles via jlowe)
YARN-744. Race condition in ApplicationMasterService.allocate .. It might
process same allocate request twice resulting in additional containers
getting allocated. (Omkar Vinit Joshi via bikas)
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -72,7 +72,6 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -97,8 +95,8 @@ public class ApplicationMasterService extends AbstractService implements
private Server server;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private final ConcurrentMap<ApplicationAttemptId, AllocateResponse> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponse>();
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
private final AllocateResponse resync =
recordFactory.newRecordInstance(AllocateResponse.class);
private final RMContext rmContext;
@ -217,21 +215,19 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId applicationAttemptId = authorizeRequest();
ApplicationId appID = applicationAttemptId.getApplicationId();
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
LOG.error(message);
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
RMAuditLogger.logFailure(this.rmContext.getRMApps().get(appID).getUser(),
AuditConstants.REGISTER_AM, message, "ApplicationMasterService",
AuditConstants.REGISTER_AM, "Application doesn't exist in cache "
+ applicationAttemptId, "ApplicationMasterService",
"Error in registering application master", appID,
applicationAttemptId);
throw RPCUtil.getRemoteException(message);
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do registerApp at a time.
synchronized (lastResponse) {
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (hasApplicationMasterRegistered(applicationAttemptId)) {
String message =
"Application Master is already registered : "
@ -251,7 +247,7 @@ public class ApplicationMasterService extends AbstractService implements
// Setting the response id to 0 to identify if the
// application master is register for the respective attemptid
lastResponse.setResponseId(0);
responseMap.put(applicationAttemptId, lastResponse);
lock.setAllocateResponse(lastResponse);
LOG.info("AM registration " + applicationAttemptId);
this.rmContext
.getDispatcher()
@ -286,17 +282,14 @@ public class ApplicationMasterService extends AbstractService implements
ApplicationAttemptId applicationAttemptId = authorizeRequest();
AllocateResponse lastResponse = responseMap.get(applicationAttemptId);
if (lastResponse == null) {
String message = "Application doesn't exist in cache "
+ applicationAttemptId;
LOG.error(message);
throw RPCUtil.getRemoteException(message);
AllocateResponseLock lock = responseMap.get(applicationAttemptId);
if (lock == null) {
throwApplicationDoesNotExistInCacheException(applicationAttemptId);
}
// Allow only one thread in AM to do finishApp at a time.
synchronized (lastResponse) {
synchronized (lock) {
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
rmContext.getDispatcher().getEventHandler().handle(
@ -313,6 +306,15 @@ public class ApplicationMasterService extends AbstractService implements
}
}
private void throwApplicationDoesNotExistInCacheException(
ApplicationAttemptId appAttemptId)
throws InvalidApplicationMasterRequestException {
String message = "Application doesn't exist in cache "
+ appAttemptId;
LOG.error(message);
throw new InvalidApplicationMasterRequestException(message);
}
/**
* @param appAttemptId
* @return true if application is registered for the respective attemptid
@ -320,10 +322,11 @@ public class ApplicationMasterService extends AbstractService implements
public boolean hasApplicationMasterRegistered(
ApplicationAttemptId appAttemptId) {
boolean hasApplicationMasterRegistered = false;
AllocateResponse lastResponse = responseMap.get(appAttemptId);
AllocateResponseLock lastResponse = responseMap.get(appAttemptId);
if (lastResponse != null) {
synchronized (lastResponse) {
if (lastResponse.getResponseId() >= 0) {
if (lastResponse.getAllocateResponse() != null
&& lastResponse.getAllocateResponse().getResponseId() >= 0) {
hasApplicationMasterRegistered = true;
}
}
@ -340,38 +343,38 @@ public class ApplicationMasterService extends AbstractService implements
this.amLivelinessMonitor.receivedPing(appAttemptId);
/* check if its in cache */
AllocateResponse lastResponse = responseMap.get(appAttemptId);
if (lastResponse == null) {
AllocateResponseLock lock = responseMap.get(appAttemptId);
if (lock == null) {
LOG.error("AppAttemptId doesnt exist in cache " + appAttemptId);
return resync;
}
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is trying to allocate before registering for: "
+ appAttemptId.getApplicationId();
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message, appAttemptId.getApplicationId(),
appAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
synchronized (lock) {
AllocateResponse lastResponse = lock.getAllocateResponse();
if (!hasApplicationMasterRegistered(appAttemptId)) {
String message =
"Application Master is trying to allocate before registering for: "
+ appAttemptId.getApplicationId();
LOG.error(message);
RMAuditLogger.logFailure(
this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
.getUser(), AuditConstants.REGISTER_AM, "",
"ApplicationMasterService", message,
appAttemptId.getApplicationId(),
appAttemptId);
throw new InvalidApplicationMasterRequestException(message);
}
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register and
// get an exception. Might as well throw an exception here.
return resync;
}
// Allow only one thread in AM to do heartbeat at a time.
synchronized (lastResponse) {
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
/* old heartbeat */
return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
LOG.error("Invalid responseid from appAttemptId " + appAttemptId);
// Oh damn! Sending reboot isn't enough. RM state is corrupted. TODO:
// Reboot is not useful since after AM reboots, it will send register
// and
// get an exception. Might as well throw an exception here.
return resync;
}
// Send the status update to the appAttempt.
this.rmContext.getDispatcher().getEventHandler().handle(
@ -380,15 +383,16 @@ public class ApplicationMasterService extends AbstractService implements
List<ResourceRequest> ask = request.getAskList();
List<ContainerId> release = request.getReleaseList();
ResourceBlacklistRequest blacklistRequest = request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
ResourceBlacklistRequest blacklistRequest =
request.getResourceBlacklistRequest();
List<String> blacklistAdditions =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistAdditions() : null;
List<String> blacklistRemovals =
(blacklistRequest != null) ?
List<String> blacklistRemovals =
(blacklistRequest != null) ?
blacklistRequest.getBlacklistRemovals() : null;
// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
@ -443,7 +447,7 @@ public class ApplicationMasterService extends AbstractService implements
rmNode.getTotalCapability(), numContainers,
rmNode.getHealthReport(),
rmNode.getLastHealthReportTime());
updatedNodeReports.add(report);
}
allocateResponse.setUpdatedNodes(updatedNodeReports);
@ -454,11 +458,12 @@ public class ApplicationMasterService extends AbstractService implements
.pullJustFinishedContainers());
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
allocateResponse.setAvailableResources(allocation.getResourceLimit());
allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
// add preemption to the allocateResponse message (if any)
allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
allocateResponse
.setPreemptionMessage(generatePreemptionMessage(allocation));
// Adding NMTokens for allocated containers.
if (!allocation.getContainers().isEmpty()) {
@ -466,21 +471,14 @@ public class ApplicationMasterService extends AbstractService implements
.createAndGetNMTokens(app.getUser(), appAttemptId,
allocation.getContainers()));
}
// before returning response, verify in sync
AllocateResponse oldResponse =
responseMap.put(appAttemptId, allocateResponse);
if (oldResponse == null) {
// appAttempt got unregistered, remove it back out
responseMap.remove(appAttemptId);
String message = "App Attempt removed from the cache during allocate"
+ appAttemptId;
LOG.error(message);
return resync;
}
/*
* As we are updating the response inside the lock object so we don't
* need to worry about unregister call occurring in between (which
* removes the lock object).
*/
lock.setAllocateResponse(allocateResponse);
return allocateResponse;
}
}
}
private PreemptionMessage generatePreemptionMessage(Allocation allocation){
@ -542,7 +540,7 @@ public class ApplicationMasterService extends AbstractService implements
// attemptID get registered
response.setResponseId(-1);
LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, response);
responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
}
@ -564,4 +562,20 @@ public class ApplicationMasterService extends AbstractService implements
}
super.serviceStop();
}
}
public static class AllocateResponseLock {
private AllocateResponse response;
public AllocateResponseLock(AllocateResponse response) {
this.response = response;
}
public synchronized AllocateResponse getAllocateResponse() {
return response;
}
public synchronized void setAllocateResponse(AllocateResponse response) {
this.response = response;
}
}
}