YARN-6640. AM heartbeat stuck when responseId overflows MAX_INT. Contributed by Botong Huang

This commit is contained in:
Jason Lowe 2017-08-25 09:16:17 -05:00
parent 9e2699ac2c
commit 3a4e861169
3 changed files with 86 additions and 15 deletions

View File

@ -81,6 +81,8 @@ import com.google.common.annotations.VisibleForTesting;
public class ApplicationMasterService extends AbstractService implements public class ApplicationMasterService extends AbstractService implements
ApplicationMasterProtocol { ApplicationMasterProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class);
private static final int PRE_REGISTER_RESPONSE_ID = -1;
private final AMLivelinessMonitor amLivelinessMonitor; private final AMLivelinessMonitor amLivelinessMonitor;
private YarnScheduler rScheduler; private YarnScheduler rScheduler;
protected InetSocketAddress masterServiceAddress; protected InetSocketAddress masterServiceAddress;
@ -325,6 +327,11 @@ public class ApplicationMasterService extends AbstractService implements
protected static final Allocation EMPTY_ALLOCATION = new Allocation( protected static final Allocation EMPTY_ALLOCATION = new Allocation(
EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null);
private int getNextResponseId(int responseId) {
// Loop between 0 to Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException { throws YarnException, IOException {
@ -357,14 +364,17 @@ public class ApplicationMasterService extends AbstractService implements
throw new ApplicationMasterNotRegisteredException(message); throw new ApplicationMasterNotRegisteredException(message);
} }
if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { // Normally request.getResponseId() == lastResponse.getResponseId()
/* old heartbeat */ if (getNextResponseId(request.getResponseId()) == lastResponse
.getResponseId()) {
// heartbeat one step old, simply return lastReponse
return lastResponse; return lastResponse;
} else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { } else if (request.getResponseId() != lastResponse.getResponseId()) {
String message = String message =
"Invalid responseId in AllocateRequest from application attempt: " "Invalid responseId in AllocateRequest from application attempt: "
+ appAttemptId + ", expect responseId to be " + appAttemptId + ", expect responseId to be "
+ (lastResponse.getResponseId() + 1); + lastResponse.getResponseId() + ", but get "
+ request.getResponseId();
throw new InvalidApplicationMasterRequestException(message); throw new InvalidApplicationMasterRequestException(message);
} }
@ -404,7 +414,7 @@ public class ApplicationMasterService extends AbstractService implements
* need to worry about unregister call occurring in between (which * need to worry about unregister call occurring in between (which
* removes the lock object). * removes the lock object).
*/ */
response.setResponseId(lastResponse.getResponseId() + 1); response.setResponseId(getNextResponseId(lastResponse.getResponseId()));
lock.setAllocateResponse(response); lock.setAllocateResponse(response);
return response; return response;
} }
@ -415,12 +425,23 @@ public class ApplicationMasterService extends AbstractService implements
recordFactory.newRecordInstance(AllocateResponse.class); recordFactory.newRecordInstance(AllocateResponse.class);
// set response id to -1 before application master for the following // set response id to -1 before application master for the following
// attemptID get registered // attemptID get registered
response.setResponseId(-1); response.setResponseId(PRE_REGISTER_RESPONSE_ID);
LOG.info("Registering app attempt : " + attemptId); LOG.info("Registering app attempt : " + attemptId);
responseMap.put(attemptId, new AllocateResponseLock(response)); responseMap.put(attemptId, new AllocateResponseLock(response));
rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
} }
@VisibleForTesting
protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId,
int lastResponseId) {
AllocateResponseLock lock = responseMap.get(attemptId);
if (lock == null || lock.getAllocateResponse() == null) {
return false;
}
lock.getAllocateResponse().setResponseId(lastResponseId);
return true;
}
public void unregisterAttempt(ApplicationAttemptId attemptId) { public void unregisterAttempt(ApplicationAttemptId attemptId) {
LOG.info("Unregistering app attempt : " + attemptId); LOG.info("Unregistering app attempt : " + attemptId);
responseMap.remove(attemptId); responseMap.remove(attemptId);

View File

@ -126,6 +126,18 @@ public class MockAM {
} }
} }
public boolean setApplicationLastResponseId(int newLastResponseId) {
ApplicationMasterService applicationMasterService =
(ApplicationMasterService) amRMProtocol;
responseId = newLastResponseId;
return applicationMasterService.setAttemptLastResponseId(attemptId,
newLastResponseId);
}
public int getResponseId() {
return responseId;
}
public void addRequests(String[] hosts, int memory, int priority, public void addRequests(String[] hosts, int memory, int priority,
int containers) throws Exception { int containers) throws Exception {
addRequests(hosts, memory, priority, containers, 0L); addRequests(hosts, memory, priority, containers, 0L);
@ -272,14 +284,17 @@ public class MockAM {
public AllocateResponse doAllocateAs(UserGroupInformation ugi, public AllocateResponse doAllocateAs(UserGroupInformation ugi,
final AllocateRequest req) throws Exception { final AllocateRequest req) throws Exception {
req.setResponseId(++responseId); req.setResponseId(responseId);
try { try {
return ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() { AllocateResponse response =
@Override ugi.doAs(new PrivilegedExceptionAction<AllocateResponse>() {
public AllocateResponse run() throws Exception { @Override
return amRMProtocol.allocate(req); public AllocateResponse run() throws Exception {
} return amRMProtocol.allocate(req);
}); }
});
responseId = response.getResponseId();
return response;
} catch (UndeclaredThrowableException e) { } catch (UndeclaredThrowableException e) {
throw (Exception) e.getCause(); throw (Exception) e.getCause();
} }

View File

@ -273,6 +273,41 @@ public class TestApplicationMasterService {
rm.stop(); rm.stop();
} }
@Test(timeout = 3000000)
public void testAllocateResponseIdOverflow() throws Exception {
MockRM rm = new MockRM(conf);
try {
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(2048);
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// Set the last reponseId to be MAX_INT
Assert.assertTrue(am1.setApplicationLastResponseId(Integer.MAX_VALUE));
// Both allocate should succeed
am1.schedule(); // send allocate with reponseId = MAX_INT
Assert.assertEquals(0, am1.getResponseId());
am1.schedule(); // send allocate with reponseId = 0
Assert.assertEquals(1, am1.getResponseId());
} finally {
if (rm != null) {
rm.stop();
}
}
}
@Test(timeout=600000) @Test(timeout=600000)
public void testInvalidContainerReleaseRequest() throws Exception { public void testInvalidContainerReleaseRequest() throws Exception {
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);