YARN-6640. AM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang
(cherry picked from commit 3a4e861169
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
This commit is contained in:
parent
832b0b82e2
commit
2d548759ad
|
@ -107,6 +107,8 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class ApplicationMasterService extends AbstractService implements
|
||||
ApplicationMasterProtocol {
|
||||
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
|
||||
private static final int PRE_REGISTER_RESPONSE_ID = -1;
|
||||
|
||||
private final AMLivelinessMonitor amLivelinessMonitor;
|
||||
private YarnScheduler rScheduler;
|
||||
private InetSocketAddress masterServiceAddress;
|
||||
|
@ -373,6 +375,11 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
protected static final Allocation EMPTY_ALLOCATION = new Allocation(
|
||||
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
|
||||
|
||||
private int getNextResponseId(int responseId) {
|
||||
// Loop between 0 to Integer.MAX_VALUE
|
||||
return (responseId + 1) & Integer.MAX_VALUE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
|
@ -409,14 +416,17 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
throw new ApplicationMasterNotRegisteredException(message);
|
||||
}
|
||||
|
||||
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) {
|
||||
/* old heartbeat */
|
||||
// Normally request.getResponseId() == lastResponse.getResponseId()
|
||||
if (getNextResponseId(request.getResponseId()) == lastResponse
|
||||
.getResponseId()) {
|
||||
// heartbeat one step old, simply return lastReponse
|
||||
return lastResponse;
|
||||
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) {
|
||||
} else if (request.getResponseId() != lastResponse.getResponseId()) {
|
||||
String message =
|
||||
"Invalid responseId in AllocateRequest from application attempt: "
|
||||
+ appAttemptId + ", expect responseId to be "
|
||||
+ (lastResponse.getResponseId() + 1);
|
||||
+ lastResponse.getResponseId() + ", but get "
|
||||
+ request.getResponseId();
|
||||
throw new InvalidApplicationMasterRequestException(message);
|
||||
}
|
||||
|
||||
|
@ -561,7 +571,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
allocateResponse.setAllocatedContainers(allocation.getContainers());
|
||||
allocateResponse.setCompletedContainersStatuses(appAttempt
|
||||
.pullJustFinishedContainers());
|
||||
allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
|
||||
allocateResponse.setResponseId(getNextResponseId(lastResponse.getResponseId()));
|
||||
allocateResponse.setAvailableResources(allocation.getResourceLimit());
|
||||
|
||||
// Handling increased/decreased containers
|
||||
|
@ -683,12 +693,23 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
// set response id to -1 before application master for the following
|
||||
// attemptID get registered
|
||||
response.setResponseId(-1);
|
||||
response.setResponseId(PRE_REGISTER_RESPONSE_ID);
|
||||
LOG.info("Registering app attempt : " + attemptId);
|
||||
responseMap.put(attemptId, new AllocateResponseLock(response));
|
||||
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId,
|
||||
int lastResponseId) {
|
||||
AllocateResponseLock lock = responseMap.get(attemptId);
|
||||
if (lock == null || lock.getAllocateResponse() == null) {
|
||||
return false;
|
||||
}
|
||||
lock.getAllocateResponse().setResponseId(lastResponseId);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void unregisterAttempt(ApplicationAttemptId attemptId) {
|
||||
LOG.info("Unregistering app attempt : " + attemptId);
|
||||
responseMap.remove(attemptId);
|
||||
|
|
|
@ -139,6 +139,18 @@ public class MockAM {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean setApplicationLastResponseId(int newLastResponseId) {
|
||||
ApplicationMasterService applicationMasterService =
|
||||
(ApplicationMasterService) amRMProtocol;
|
||||
responseId = newLastResponseId;
|
||||
return applicationMasterService.setAttemptLastResponseId(attemptId,
|
||||
newLastResponseId);
|
||||
}
|
||||
|
||||
public int getResponseId() {
|
||||
return responseId;
|
||||
}
|
||||
|
||||
public void addRequests(String[] hosts, int memory, int priority,
|
||||
int containers) throws Exception {
|
||||
requests.addAll(createReq(hosts, memory, priority, containers));
|
||||
|
@ -258,19 +270,22 @@ public class MockAM {
|
|||
|
||||
public AllocateResponse doAllocateAs(UserGroupInformation ugi,
|
||||
final AllocateRequest req) throws Exception {
|
||||
req.setResponseId(++responseId);
|
||||
req.setResponseId(responseId);
|
||||
try {
|
||||
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse run() throws Exception {
|
||||
return amRMProtocol.allocate(req);
|
||||
}
|
||||
});
|
||||
AllocateResponse response =
|
||||
ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
|
||||
@Override
|
||||
public AllocateResponse run() throws Exception {
|
||||
return amRMProtocol.allocate(req);
|
||||
}
|
||||
});
|
||||
responseId = response.getResponseId();
|
||||
return response;
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw (Exception) e.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public AllocateResponse doHeartbeat() throws Exception {
|
||||
return allocate(null, null);
|
||||
}
|
||||
|
|
|
@ -116,7 +116,42 @@ public class TestApplicationMasterService {
|
|||
Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier());
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
||||
@Test(timeout = 3000000)
|
||||
public void testAllocateResponseIdOverflow() throws Exception {
|
||||
MockRM rm = new MockRM(conf);
|
||||
try {
|
||||
rm.start();
|
||||
|
||||
// Register node1
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
|
||||
// Submit an application
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
|
||||
// kick the scheduling
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
am1.registerAppAttempt();
|
||||
|
||||
// Set the last reponseId to be MAX_INT
|
||||
Assert.assertTrue(am1.setApplicationLastResponseId(Integer.MAX_VALUE));
|
||||
|
||||
// Both allocate should succeed
|
||||
am1.schedule(); // send allocate with reponseId = MAX_INT
|
||||
Assert.assertEquals(0, am1.getResponseId());
|
||||
|
||||
am1.schedule(); // send allocate with reponseId = 0
|
||||
Assert.assertEquals(1, am1.getResponseId());
|
||||
|
||||
} finally {
|
||||
if (rm != null) {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=600000)
|
||||
public void testInvalidContainerReleaseRequest() throws Exception {
|
||||
MockRM rm = new MockRM(conf);
|
||||
|
|
Loading…
Reference in New Issue