diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index 1ff69ba27dc..8775553952a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -107,6 +107,8 @@ import com.google.common.annotations.VisibleForTesting; public class ApplicationMasterService extends AbstractService implements ApplicationMasterProtocol { private static final Log LOG = LogFactory.getLog(ApplicationMasterService.class); + private static final int PRE_REGISTER_RESPONSE_ID = -1; + private final AMLivelinessMonitor amLivelinessMonitor; private YarnScheduler rScheduler; private InetSocketAddress masterServiceAddress; @@ -373,6 +375,11 @@ public class ApplicationMasterService extends AbstractService implements protected static final Allocation EMPTY_ALLOCATION = new Allocation( EMPTY_CONTAINER_LIST, Resources.createResource(0), null, null, null); + private int getNextResponseId(int responseId) { + // Loop between 0 to Integer.MAX_VALUE + return (responseId + 1) & Integer.MAX_VALUE; + } + @Override public AllocateResponse allocate(AllocateRequest request) throws YarnException, IOException { @@ -409,14 +416,17 @@ public class ApplicationMasterService extends AbstractService implements throw new ApplicationMasterNotRegisteredException(message); } - if ((request.getResponseId() + 1) == lastResponse.getResponseId()) { - /* old heartbeat */ + // Normally request.getResponseId() == lastResponse.getResponseId() + if (getNextResponseId(request.getResponseId()) == lastResponse + .getResponseId()) { + // heartbeat one step old, simply return lastReponse return lastResponse; - } else if (request.getResponseId() + 1 < lastResponse.getResponseId()) { + } else if (request.getResponseId() != lastResponse.getResponseId()) { String message = "Invalid responseId in AllocateRequest from application attempt: " + appAttemptId + ", expect responseId to be " - + (lastResponse.getResponseId() + 1); + + lastResponse.getResponseId() + ", but get " + + request.getResponseId(); throw new InvalidApplicationMasterRequestException(message); } @@ -561,7 +571,7 @@ public class ApplicationMasterService extends AbstractService implements allocateResponse.setAllocatedContainers(allocation.getContainers()); allocateResponse.setCompletedContainersStatuses(appAttempt .pullJustFinishedContainers()); - allocateResponse.setResponseId(lastResponse.getResponseId() + 1); + allocateResponse.setResponseId(getNextResponseId(lastResponse.getResponseId())); allocateResponse.setAvailableResources(allocation.getResourceLimit()); // Handling increased/decreased containers @@ -683,12 +693,23 @@ public class ApplicationMasterService extends AbstractService implements recordFactory.newRecordInstance(AllocateResponse.class); // set response id to -1 before application master for the following // attemptID get registered - response.setResponseId(-1); + response.setResponseId(PRE_REGISTER_RESPONSE_ID); LOG.info("Registering app attempt : " + attemptId); responseMap.put(attemptId, new AllocateResponseLock(response)); rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId); } + @VisibleForTesting + protected boolean setAttemptLastResponseId(ApplicationAttemptId attemptId, + int lastResponseId) { + AllocateResponseLock lock = responseMap.get(attemptId); + if (lock == null || lock.getAllocateResponse() == null) { + return false; + } + lock.getAllocateResponse().setResponseId(lastResponseId); + return true; + } + public void unregisterAttempt(ApplicationAttemptId attemptId) { LOG.info("Unregistering app attempt : " + attemptId); responseMap.remove(attemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java index 49a6aec6ce9..bdad3d616bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java @@ -139,6 +139,18 @@ public class MockAM { } } + public boolean setApplicationLastResponseId(int newLastResponseId) { + ApplicationMasterService applicationMasterService = + (ApplicationMasterService) amRMProtocol; + responseId = newLastResponseId; + return applicationMasterService.setAttemptLastResponseId(attemptId, + newLastResponseId); + } + + public int getResponseId() { + return responseId; + } + public void addRequests(String[] hosts, int memory, int priority, int containers) throws Exception { requests.addAll(createReq(hosts, memory, priority, containers)); @@ -258,19 +270,22 @@ public class MockAM { public AllocateResponse doAllocateAs(UserGroupInformation ugi, final AllocateRequest req) throws Exception { - req.setResponseId(++responseId); + req.setResponseId(responseId); try { - return ugi.doAs(new PrivilegedExceptionAction() { - @Override - public AllocateResponse run() throws Exception { - return amRMProtocol.allocate(req); - } - }); + AllocateResponse response = + ugi.doAs(new PrivilegedExceptionAction() { + @Override + public AllocateResponse run() throws Exception { + return amRMProtocol.allocate(req); + } + }); + responseId = response.getResponseId(); + return response; } catch (UndeclaredThrowableException e) { throw (Exception) e.getCause(); } } - + public AllocateResponse doHeartbeat() throws Exception { return allocate(null, null); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java index d042817efa8..2b53fcf1e24 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java @@ -116,7 +116,42 @@ public class TestApplicationMasterService { Assert.assertEquals(MockRM.getClusterTimeStamp(), tokenId.getRMIdentifier()); rm.stop(); } - + + @Test(timeout = 3000000) + public void testAllocateResponseIdOverflow() throws Exception { + MockRM rm = new MockRM(conf); + try { + rm.start(); + + // Register node1 + MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB); + + // Submit an application + RMApp app1 = rm.submitApp(2048); + + // kick the scheduling + nm1.nodeHeartbeat(true); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // Set the last reponseId to be MAX_INT + Assert.assertTrue(am1.setApplicationLastResponseId(Integer.MAX_VALUE)); + + // Both allocate should succeed + am1.schedule(); // send allocate with reponseId = MAX_INT + Assert.assertEquals(0, am1.getResponseId()); + + am1.schedule(); // send allocate with reponseId = 0 + Assert.assertEquals(1, am1.getResponseId()); + + } finally { + if (rm != null) { + rm.stop(); + } + } + } + @Test(timeout=600000) public void testInvalidContainerReleaseRequest() throws Exception { MockRM rm = new MockRM(conf);