YARN-8673. [AMRMProxy] More robust responseId resync after an YarnRM master slave switch. Contributed by Botong Huang.
This commit is contained in:
parent
89da0e9901
commit
8e6807ef4a
|
@ -30,6 +30,7 @@ import org.apache.hadoop.security.SaslRpcServer;
|
|||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -46,6 +47,10 @@ public final class AMRMClientUtils {
|
|||
public static final String APP_ALREADY_REGISTERED_MESSAGE =
|
||||
"Application Master is already registered : ";
|
||||
|
||||
public static final String EXPECTED_HB_RESPONSEID_MESSAGE =
|
||||
" expect responseId to be ";
|
||||
public static final String RECEIVED_HB_RESPONSEID_MESSAGE = " but get ";
|
||||
|
||||
private AMRMClientUtils() {
|
||||
}
|
||||
|
||||
|
@ -96,4 +101,46 @@ public final class AMRMClientUtils {
|
|||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
SaslRpcServer.AuthMethod.TOKEN.toString());
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate the exception message when RM receives an AM heartbeat with
|
||||
* invalid responseId.
|
||||
*
|
||||
* @param appAttemptId the app attempt
|
||||
* @param expected the expected responseId value
|
||||
* @param received the received responseId value
|
||||
* @return the assembled exception message
|
||||
*/
|
||||
public static String assembleInvalidResponseIdExceptionMessage(
|
||||
ApplicationAttemptId appAttemptId, int expected, int received) {
|
||||
return "Invalid responseId in AllocateRequest from application attempt: "
|
||||
+ appAttemptId + EXPECTED_HB_RESPONSEID_MESSAGE + expected
|
||||
+ RECEIVED_HB_RESPONSEID_MESSAGE + received;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the expected responseId from the exception generated by RM when
|
||||
* processing AM heartbeat.
|
||||
*
|
||||
* @param exceptionMessage the exception message thrown by RM
|
||||
* @return the parsed expected responseId, -1 if failed
|
||||
*/
|
||||
public static int parseExpectedResponseIdFromException(
|
||||
String exceptionMessage) {
|
||||
if (exceptionMessage == null) {
|
||||
return -1;
|
||||
}
|
||||
int start = exceptionMessage.indexOf(EXPECTED_HB_RESPONSEID_MESSAGE);
|
||||
int end = exceptionMessage.indexOf(RECEIVED_HB_RESPONSEID_MESSAGE);
|
||||
if (start == -1 || end == -1) {
|
||||
return -1;
|
||||
}
|
||||
start += EXPECTED_HB_RESPONSEID_MESSAGE.length();
|
||||
|
||||
try {
|
||||
return Integer.parseInt(exceptionMessage.substring(start, end));
|
||||
} catch (NumberFormatException ex) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -37,15 +37,18 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.client.ClientRMProxy;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
||||
|
@ -105,13 +108,22 @@ public class AMRMClientRelayer extends AbstractService
|
|||
new HashMap<>();
|
||||
private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
|
||||
|
||||
private ApplicationId appId;
|
||||
|
||||
// Normally -1, otherwise will override responseId with this value in the next
|
||||
// heartbeat
|
||||
private volatile int resetResponseId;
|
||||
|
||||
public AMRMClientRelayer() {
|
||||
super(AMRMClientRelayer.class.getName());
|
||||
this.resetResponseId = -1;
|
||||
}
|
||||
|
||||
public AMRMClientRelayer(ApplicationMasterProtocol rmClient) {
|
||||
public AMRMClientRelayer(ApplicationMasterProtocol rmClient,
|
||||
ApplicationId appId) {
|
||||
this();
|
||||
this.rmClient = rmClient;
|
||||
this.appId = appId;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -161,51 +173,56 @@ public class AMRMClientRelayer extends AbstractService
|
|||
try {
|
||||
return this.rmClient.finishApplicationMaster(request);
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.warn("Out of sync with ResourceManager, hence resyncing.");
|
||||
LOG.warn("Out of sync with RM for " + this.appId + ", hence resyncing.");
|
||||
// re register with RM
|
||||
registerApplicationMaster(this.amRegistrationRequest);
|
||||
return finishApplicationMaster(request);
|
||||
}
|
||||
}
|
||||
|
||||
private void addNewAllocateRequest(AllocateRequest allocateRequest)
|
||||
throws YarnException {
|
||||
// update the data structures first
|
||||
addNewAsks(allocateRequest.getAskList());
|
||||
|
||||
if (allocateRequest.getReleaseList() != null) {
|
||||
this.remotePendingRelease.addAll(allocateRequest.getReleaseList());
|
||||
this.release.addAll(allocateRequest.getReleaseList());
|
||||
}
|
||||
|
||||
if (allocateRequest.getResourceBlacklistRequest() != null) {
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistAdditions() != null) {
|
||||
this.remoteBlacklistedNodes.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
this.blacklistAdditions.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
}
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistRemovals() != null) {
|
||||
this.remoteBlacklistedNodes.removeAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
this.blacklistRemovals.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
}
|
||||
}
|
||||
|
||||
if (allocateRequest.getUpdateRequests() != null) {
|
||||
for (UpdateContainerRequest update : allocateRequest
|
||||
.getUpdateRequests()) {
|
||||
this.remotePendingChange.put(update.getContainerId(), update);
|
||||
this.change.put(update.getContainerId(), update);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AllocateResponse allocate(AllocateRequest allocateRequest)
|
||||
throws YarnException, IOException {
|
||||
AllocateResponse allocateResponse = null;
|
||||
try {
|
||||
synchronized (this) {
|
||||
// update the data structures first
|
||||
addNewAsks(allocateRequest.getAskList());
|
||||
|
||||
if (allocateRequest.getReleaseList() != null) {
|
||||
this.remotePendingRelease.addAll(allocateRequest.getReleaseList());
|
||||
this.release.addAll(allocateRequest.getReleaseList());
|
||||
}
|
||||
|
||||
if (allocateRequest.getResourceBlacklistRequest() != null) {
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistAdditions() != null) {
|
||||
this.remoteBlacklistedNodes.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
this.blacklistAdditions.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistAdditions());
|
||||
}
|
||||
if (allocateRequest.getResourceBlacklistRequest()
|
||||
.getBlacklistRemovals() != null) {
|
||||
this.remoteBlacklistedNodes.removeAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
this.blacklistRemovals.addAll(allocateRequest
|
||||
.getResourceBlacklistRequest().getBlacklistRemovals());
|
||||
}
|
||||
}
|
||||
|
||||
if (allocateRequest.getUpdateRequests() != null) {
|
||||
for (UpdateContainerRequest update : allocateRequest
|
||||
.getUpdateRequests()) {
|
||||
this.remotePendingChange.put(update.getContainerId(), update);
|
||||
this.change.put(update.getContainerId(), update);
|
||||
}
|
||||
}
|
||||
addNewAllocateRequest(allocateRequest);
|
||||
|
||||
ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
|
||||
for (ResourceRequest r : ask) {
|
||||
|
@ -222,13 +239,23 @@ public class AMRMClientRelayer extends AbstractService
|
|||
new ArrayList<>(this.blacklistAdditions),
|
||||
new ArrayList<>(this.blacklistRemovals)))
|
||||
.updateRequests(new ArrayList<>(this.change.values())).build();
|
||||
|
||||
if (this.resetResponseId != -1) {
|
||||
LOG.info("Override allocate responseId from "
|
||||
+ allocateRequest.getResponseId() + " to " + this.resetResponseId
|
||||
+ " for " + this.appId);
|
||||
allocateRequest.setResponseId(this.resetResponseId);
|
||||
}
|
||||
}
|
||||
|
||||
// Do the actual allocate call
|
||||
try {
|
||||
allocateResponse = this.rmClient.allocate(allocateRequest);
|
||||
|
||||
// Heartbeat succeeded, wipe out responseId overriding
|
||||
this.resetResponseId = -1;
|
||||
} catch (ApplicationMasterNotRegisteredException e) {
|
||||
LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
||||
LOG.warn("ApplicationMaster is out of sync with RM for " + this.appId
|
||||
+ " hence resyncing.");
|
||||
|
||||
synchronized (this) {
|
||||
|
@ -249,6 +276,25 @@ public class AMRMClientRelayer extends AbstractService
|
|||
// Reset responseId after re-register
|
||||
allocateRequest.setResponseId(0);
|
||||
return allocate(allocateRequest);
|
||||
} catch (Throwable t) {
|
||||
|
||||
// If RM is complaining about responseId out of sync, force reset next
|
||||
// time
|
||||
if (t instanceof InvalidApplicationMasterRequestException) {
|
||||
int responseId = AMRMClientUtils
|
||||
.parseExpectedResponseIdFromException(t.getMessage());
|
||||
if (responseId != -1) {
|
||||
this.resetResponseId = responseId;
|
||||
LOG.info("ResponseId out of sync with RM, expect " + responseId
|
||||
+ " but " + allocateRequest.getResponseId() + " used by "
|
||||
+ this.appId + ". Will override in the next allocate.");
|
||||
} else {
|
||||
LOG.warn("Failed to parse expected responseId out of exception for "
|
||||
+ this.appId);
|
||||
}
|
||||
}
|
||||
|
||||
throw t;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
|
|
|
@ -193,7 +193,7 @@ public class UnmanagedApplicationManager {
|
|||
this.applicationId.toString(), UserGroupInformation.getCurrentUser());
|
||||
this.rmProxyRelayer =
|
||||
new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class,
|
||||
this.conf, this.userUgi, amrmToken));
|
||||
this.conf, this.userUgi, amrmToken), this.applicationId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -40,7 +40,9 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
|
@ -62,6 +64,7 @@ public class TestAMRMClientRelayer {
|
|||
// Whether this mockRM will throw failover exception upon next heartbeat
|
||||
// from AM
|
||||
private boolean failover = false;
|
||||
private int responseIdReset = -1;
|
||||
private List<ResourceRequest> lastAsk;
|
||||
private List<ContainerId> lastRelease;
|
||||
private List<String> lastBlacklistAdditions;
|
||||
|
@ -92,26 +95,40 @@ public class TestAMRMClientRelayer {
|
|||
this.failover = false;
|
||||
throw new ApplicationMasterNotRegisteredException("Mock RM restarted");
|
||||
}
|
||||
if (this.responseIdReset != -1) {
|
||||
String errorMessage =
|
||||
AMRMClientUtils.assembleInvalidResponseIdExceptionMessage(null,
|
||||
this.responseIdReset, request.getResponseId());
|
||||
this.responseIdReset = -1;
|
||||
throw new InvalidApplicationMasterRequestException(errorMessage);
|
||||
}
|
||||
|
||||
this.lastAsk = request.getAskList();
|
||||
this.lastRelease = request.getReleaseList();
|
||||
this.lastBlacklistAdditions =
|
||||
request.getResourceBlacklistRequest().getBlacklistAdditions();
|
||||
this.lastBlacklistRemovals =
|
||||
request.getResourceBlacklistRequest().getBlacklistRemovals();
|
||||
return AllocateResponse.newInstance(0, null, null,
|
||||
new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null, 0,
|
||||
null, null);
|
||||
return AllocateResponse.newInstance(request.getResponseId() + 1, null,
|
||||
null, new ArrayList<NodeReport>(), Resource.newInstance(0, 0), null,
|
||||
0, null, null);
|
||||
}
|
||||
|
||||
public void setFailoverFlag() {
|
||||
this.failover = true;
|
||||
}
|
||||
|
||||
public void setResponseIdReset(int expectedResponseId) {
|
||||
this.responseIdReset = expectedResponseId;
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration conf;
|
||||
private MockApplicationMasterService mockAMS;
|
||||
private AMRMClientRelayer relayer;
|
||||
|
||||
private int responseId = 0;
|
||||
|
||||
// Buffer of asks that will be sent to RM in the next AM heartbeat
|
||||
private List<ResourceRequest> asks = new ArrayList<>();
|
||||
private List<ContainerId> releases = new ArrayList<>();
|
||||
|
@ -123,7 +140,7 @@ public class TestAMRMClientRelayer {
|
|||
this.conf = new Configuration();
|
||||
|
||||
this.mockAMS = new MockApplicationMasterService();
|
||||
this.relayer = new AMRMClientRelayer(this.mockAMS);
|
||||
this.relayer = new AMRMClientRelayer(this.mockAMS, null);
|
||||
|
||||
this.relayer.init(conf);
|
||||
this.relayer.start();
|
||||
|
@ -150,7 +167,7 @@ public class TestAMRMClientRelayer {
|
|||
private AllocateRequest getAllocateRequest() {
|
||||
// Need to create a new one every time because rather than directly
|
||||
// referring the lists, the protobuf impl makes a copy of the lists
|
||||
return AllocateRequest.newInstance(0, 0, asks, releases,
|
||||
return AllocateRequest.newInstance(responseId, 0, asks, releases,
|
||||
ResourceBlacklistRequest.newInstance(blacklistAdditions,
|
||||
blacklistRemoval));
|
||||
}
|
||||
|
@ -272,4 +289,30 @@ public class TestAMRMClientRelayer {
|
|||
clearAllocateRequestLists();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResponseIdResync() throws YarnException, IOException {
|
||||
this.responseId = 10;
|
||||
|
||||
AllocateResponse response = this.relayer.allocate(getAllocateRequest());
|
||||
Assert.assertEquals(this.responseId + 1, response.getResponseId());
|
||||
|
||||
int expected = 5;
|
||||
this.mockAMS.setResponseIdReset(expected);
|
||||
|
||||
try {
|
||||
this.relayer.allocate(getAllocateRequest());
|
||||
Assert.fail("Expecting exception from RM");
|
||||
} catch (InvalidApplicationMasterRequestException e) {
|
||||
// Expected exception
|
||||
}
|
||||
|
||||
// Verify that the responseId is overridden
|
||||
response = this.relayer.allocate(getAllocateRequest());
|
||||
Assert.assertEquals(expected + 1, response.getResponseId());
|
||||
|
||||
// Verify it is no longer overriden
|
||||
this.responseId = response.getResponseId();
|
||||
response = this.relayer.allocate(getAllocateRequest());
|
||||
Assert.assertEquals(this.responseId + 1, response.getResponseId());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -249,8 +249,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
|
|||
|
||||
this.homeSubClusterId =
|
||||
SubClusterId.newInstance(YarnConfiguration.getClusterId(conf));
|
||||
this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext,
|
||||
ApplicationMasterProtocol.class, this.appOwner));
|
||||
this.homeRMRelayer = new AMRMClientRelayer(
|
||||
createHomeRMProxy(appContext, ApplicationMasterProtocol.class,
|
||||
this.appOwner),
|
||||
getApplicationContext().getApplicationAttemptId().getApplicationId());
|
||||
|
||||
this.federationFacade = FederationStateStoreFacade.getInstance();
|
||||
this.subClusterResolver = this.federationFacade.getSubClusterResolver();
|
||||
|
|
|
@ -375,12 +375,9 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
// heartbeat one step old, simply return lastReponse
|
||||
return lastResponse;
|
||||
} else if (request.getResponseId() != lastResponse.getResponseId()) {
|
||||
String message =
|
||||
"Invalid responseId in AllocateRequest from application attempt: "
|
||||
+ appAttemptId + ", expect responseId to be "
|
||||
+ lastResponse.getResponseId() + ", but get "
|
||||
+ request.getResponseId();
|
||||
throw new InvalidApplicationMasterRequestException(message);
|
||||
throw new InvalidApplicationMasterRequestException(AMRMClientUtils
|
||||
.assembleInvalidResponseIdExceptionMessage(appAttemptId,
|
||||
lastResponse.getResponseId(), request.getResponseId()));
|
||||
}
|
||||
|
||||
AllocateResponse response =
|
||||
|
|
Loading…
Reference in New Issue