diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 06948bb8d6a..cddb9be5253 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; @@ -46,6 +47,10 @@ public final class AMRMClientUtils { public static final String APP_ALREADY_REGISTERED_MESSAGE = "Application Master is already registered : "; + public static final String EXPECTED_HB_RESPONSEID_MESSAGE = + " expect responseId to be "; + public static final String RECEIVED_HB_RESPONSEID_MESSAGE = " but get "; + private AMRMClientUtils() { } @@ -96,4 +101,46 @@ public final class AMRMClientUtils { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, SaslRpcServer.AuthMethod.TOKEN.toString()); } + + /** + * Generate the exception message when RM receives an AM heartbeat with + * invalid responseId. + * + * @param appAttemptId the app attempt + * @param expected the expected responseId value + * @param received the received responseId value + * @return the assembled exception message + */ + public static String assembleInvalidResponseIdExceptionMessage( + ApplicationAttemptId appAttemptId, int expected, int received) { + return "Invalid responseId in AllocateRequest from application attempt: " + + appAttemptId + EXPECTED_HB_RESPONSEID_MESSAGE + expected + + RECEIVED_HB_RESPONSEID_MESSAGE + received; + } + + /** + * Parse the expected responseId from the exception generated by RM when + * processing AM heartbeat. + * + * @param exceptionMessage the exception message thrown by RM + * @return the parsed expected responseId, -1 if failed + */ + public static int parseExpectedResponseIdFromException( + String exceptionMessage) { + if (exceptionMessage == null) { + return -1; + } + int start = exceptionMessage.indexOf(EXPECTED_HB_RESPONSEID_MESSAGE); + int end = exceptionMessage.indexOf(RECEIVED_HB_RESPONSEID_MESSAGE); + if (start == -1 || end == -1) { + return -1; + } + start += EXPECTED_HB_RESPONSEID_MESSAGE.length(); + + try { + return Integer.parseInt(exceptionMessage.substring(start, end)); + } catch (NumberFormatException ex) { + return -1; + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index 21cb55f7b89..62898eca3a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -37,15 +37,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdatedContainer; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; @@ -105,13 +108,22 @@ public class AMRMClientRelayer extends AbstractService new HashMap<>(); private Map change = new HashMap<>(); + private ApplicationId appId; + + // Normally -1, otherwise will override responseId with this value in the next + // heartbeat + private volatile int resetResponseId; + public AMRMClientRelayer() { super(AMRMClientRelayer.class.getName()); + this.resetResponseId = -1; } - public AMRMClientRelayer(ApplicationMasterProtocol rmClient) { + public AMRMClientRelayer(ApplicationMasterProtocol rmClient, + ApplicationId appId) { this(); this.rmClient = rmClient; + this.appId = appId; } @Override @@ -161,51 +173,56 @@ public class AMRMClientRelayer extends AbstractService try { return this.rmClient.finishApplicationMaster(request); } catch (ApplicationMasterNotRegisteredException e) { - LOG.warn("Out of sync with ResourceManager, hence resyncing."); + LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing."); // re register with RM registerApplicationMaster(this.amRegistrationRequest); return finishApplicationMaster(request); } } + private void addNewAllocateRequest(AllocateRequest allocateRequest) + throws YarnException { + // update the data structures first + addNewAsks(allocateRequest.getAskList()); + + if (allocateRequest.getReleaseList() != null) { + this.remotePendingRelease.addAll(allocateRequest.getReleaseList()); + this.release.addAll(allocateRequest.getReleaseList()); + } + + if (allocateRequest.getResourceBlacklistRequest() != null) { + if (allocateRequest.getResourceBlacklistRequest() + .getBlacklistAdditions() != null) { + this.remoteBlacklistedNodes.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistAdditions()); + this.blacklistAdditions.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistAdditions()); + } + if (allocateRequest.getResourceBlacklistRequest() + .getBlacklistRemovals() != null) { + this.remoteBlacklistedNodes.removeAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistRemovals()); + this.blacklistRemovals.addAll(allocateRequest + .getResourceBlacklistRequest().getBlacklistRemovals()); + } + } + + if (allocateRequest.getUpdateRequests() != null) { + for (UpdateContainerRequest update : allocateRequest + .getUpdateRequests()) { + this.remotePendingChange.put(update.getContainerId(), update); + this.change.put(update.getContainerId(), update); + } + } + } + @Override public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException, IOException { AllocateResponse allocateResponse = null; try { synchronized (this) { - // update the data structures first - addNewAsks(allocateRequest.getAskList()); - - if (allocateRequest.getReleaseList() != null) { - this.remotePendingRelease.addAll(allocateRequest.getReleaseList()); - this.release.addAll(allocateRequest.getReleaseList()); - } - - if (allocateRequest.getResourceBlacklistRequest() != null) { - if (allocateRequest.getResourceBlacklistRequest() - .getBlacklistAdditions() != null) { - this.remoteBlacklistedNodes.addAll(allocateRequest - .getResourceBlacklistRequest().getBlacklistAdditions()); - this.blacklistAdditions.addAll(allocateRequest - .getResourceBlacklistRequest().getBlacklistAdditions()); - } - if (allocateRequest.getResourceBlacklistRequest() - .getBlacklistRemovals() != null) { - this.remoteBlacklistedNodes.removeAll(allocateRequest - .getResourceBlacklistRequest().getBlacklistRemovals()); - this.blacklistRemovals.addAll(allocateRequest - .getResourceBlacklistRequest().getBlacklistRemovals()); - } - } - - if (allocateRequest.getUpdateRequests() != null) { - for (UpdateContainerRequest update : allocateRequest - .getUpdateRequests()) { - this.remotePendingChange.put(update.getContainerId(), update); - this.change.put(update.getContainerId(), update); - } - } + addNewAllocateRequest(allocateRequest); ArrayList askList = new ArrayList<>(ask.size()); for (ResourceRequest r : ask) { @@ -222,13 +239,23 @@ public class AMRMClientRelayer extends AbstractService new ArrayList<>(this.blacklistAdditions), new ArrayList<>(this.blacklistRemovals))) .updateRequests(new ArrayList<>(this.change.values())).build(); + + if (this.resetResponseId != -1) { + LOG.info("Override allocate responseId from " + + allocateRequest.getResponseId() + " to " + this.resetResponseId + + " for " + this.appId); + allocateRequest.setResponseId(this.resetResponseId); + } } // Do the actual allocate call try { allocateResponse = this.rmClient.allocate(allocateRequest); + + // Heartbeat succeeded, wipe out responseId overriding + this.resetResponseId = -1; } catch (ApplicationMasterNotRegisteredException e) { - LOG.warn("ApplicationMaster is out of sync with ResourceManager," + LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId + " hence resyncing."); synchronized (this) { @@ -249,6 +276,25 @@ public class AMRMClientRelayer extends AbstractService // Reset responseId after re-register allocateRequest.setResponseId(0); return allocate(allocateRequest); + } catch (Throwable t) { + + // If RM is complaining about responseId out of sync, force reset next + // time + if (t instanceof InvalidApplicationMasterRequestException) { + int responseId = AMRMClientUtils + .parseExpectedResponseIdFromException(t.getMessage()); + if (responseId != -1) { + this.resetResponseId = responseId; + LOG.info("ResponseId out of sync with RM, expect " + responseId + + " but " + allocateRequest.getResponseId() + " used by " + + this.appId + ". Will override in the next allocate."); + } else { + LOG.warn("Failed to parse expected responseId out of exception for " + + this.appId); + } + } + + throw t; } synchronized (this) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 52bde8c0503..bf2a6230532 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -193,7 +193,7 @@ public class UnmanagedApplicationManager { this.applicationId.toString(), UserGroupInformation.getCurrentUser()); this.rmProxyRelayer = new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, - this.conf, this.userUgi, amrmToken)); + this.conf, this.userUgi, amrmToken), this.applicationId); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java index 22bb1f98586..4c84f0b9210 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/TestAMRMClientRelayer.java @@ -40,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; +import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet; import org.apache.hadoop.yarn.util.Records; @@ -62,6 +64,7 @@ public class TestAMRMClientRelayer { // Whether this mockRM will throw failover exception upon next heartbeat // from AM private boolean failover = false; + private int responseIdReset = -1; private List lastAsk; private List lastRelease; private List lastBlacklistAdditions; @@ -92,26 +95,40 @@ public class TestAMRMClientRelayer { this.failover = false; throw new ApplicationMasterNotRegisteredException("Mock RM restarted"); } + if (this.responseIdReset != -1) { + String errorMessage = + AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(null, + this.responseIdReset, request.getResponseId()); + this.responseIdReset = -1; + throw new InvalidApplicationMasterRequestException(errorMessage); + } + this.lastAsk = request.getAskList(); this.lastRelease = request.getReleaseList(); this.lastBlacklistAdditions = request.getResourceBlacklistRequest().getBlacklistAdditions(); this.lastBlacklistRemovals = request.getResourceBlacklistRequest().getBlacklistRemovals(); - return AllocateResponse.newInstance(0, null, null, - new ArrayList(), Resource.newInstance(0, 0), null, 0, - null, null); + return AllocateResponse.newInstance(request.getResponseId() + 1, null, + null, new ArrayList(), Resource.newInstance(0, 0), null, + 0, null, null); } public void setFailoverFlag() { this.failover = true; } + + public void setResponseIdReset(int expectedResponseId) { + this.responseIdReset = expectedResponseId; + } } private Configuration conf; private MockApplicationMasterService mockAMS; private AMRMClientRelayer relayer; + private int responseId = 0; + // Buffer of asks that will be sent to RM in the next AM heartbeat private List asks = new ArrayList<>(); private List releases = new ArrayList<>(); @@ -123,7 +140,7 @@ public class TestAMRMClientRelayer { this.conf = new Configuration(); this.mockAMS = new MockApplicationMasterService(); - this.relayer = new AMRMClientRelayer(this.mockAMS); + this.relayer = new AMRMClientRelayer(this.mockAMS, null); this.relayer.init(conf); this.relayer.start(); @@ -150,7 +167,7 @@ public class TestAMRMClientRelayer { private AllocateRequest getAllocateRequest() { // Need to create a new one every time because rather than directly // referring the lists, the protobuf impl makes a copy of the lists - return AllocateRequest.newInstance(0, 0, asks, releases, + return AllocateRequest.newInstance(responseId, 0, asks, releases, ResourceBlacklistRequest.newInstance(blacklistAdditions, blacklistRemoval)); } @@ -272,4 +289,30 @@ public class TestAMRMClientRelayer { clearAllocateRequestLists(); } + @Test + public void testResponseIdResync() throws YarnException, IOException { + this.responseId = 10; + + AllocateResponse response = this.relayer.allocate(getAllocateRequest()); + Assert.assertEquals(this.responseId + 1, response.getResponseId()); + + int expected = 5; + this.mockAMS.setResponseIdReset(expected); + + try { + this.relayer.allocate(getAllocateRequest()); + Assert.fail("Expecting exception from RM"); + } catch (InvalidApplicationMasterRequestException e) { + // Expected exception + } + + // Verify that the responseId is overridden + response = this.relayer.allocate(getAllocateRequest()); + Assert.assertEquals(expected + 1, response.getResponseId()); + + // Verify it is no longer overriden + this.responseId = response.getResponseId(); + response = this.relayer.allocate(getAllocateRequest()); + Assert.assertEquals(this.responseId + 1, response.getResponseId()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 645e47e5af0..65a22770390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -249,8 +249,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, - ApplicationMasterProtocol.class, this.appOwner)); + this.homeRMRelayer = new AMRMClientRelayer( + createHomeRMProxy(appContext, ApplicationMasterProtocol.class, + this.appOwner), + getApplicationContext().getApplicationAttemptId().getApplicationId()); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java index dee90c9107e..b53e9976f31 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java @@ -375,12 +375,9 @@ public class ApplicationMasterService extends AbstractService implements // heartbeat one step old, simply return lastReponse return lastResponse; } else if (request.getResponseId() != lastResponse.getResponseId()) { - String message = - "Invalid responseId in AllocateRequest from application attempt: " - + appAttemptId + ", expect responseId to be " - + lastResponse.getResponseId() + ", but get " - + request.getResponseId(); - throw new InvalidApplicationMasterRequestException(message); + throw new InvalidApplicationMasterRequestException(AMRMClientUtils + .assembleInvalidResponseIdExceptionMessage(appAttemptId, + lastResponse.getResponseId(), request.getResponseId())); } AllocateResponse response =