YARN-8760. [AMRMProxy] Fix concurrent re-register due to YarnRM failover in AMRMClientRelayer. Contributed by Botong Huang.

This commit is contained in:
Giovanni Matteo Fumarola 2018-10-01 13:12:38 -07:00 committed by Giovanni Matteo Fumarola
parent d7d0e55e0a
commit 57caab6f9b
3 changed files with 60 additions and 2 deletions

View File

@ -232,6 +232,27 @@ public class AMRMClientRelayer extends AbstractService
return this.rmClient.registerApplicationMaster(request);
}
/**
* After an RM failover, there might be more than one
* allocate/finishApplicationMaster call thread (due to RPC timeout and retry)
* doing the auto re-register concurrently. As a result, we need to swallow
* the already register exception thrown by the new RM.
*/
private void reRegisterApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
try {
registerApplicationMaster(request);
} catch (InvalidApplicationMasterRequestException e) {
if (e.getMessage()
.contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
LOG.info("Concurrent thread successfully re-registered, moving on.");
} else {
throw e;
}
}
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
@ -242,7 +263,7 @@ public class AMRMClientRelayer extends AbstractService
LOG.warn("Out of sync with RM " + rmId
+ " for " + this.appId + ", hence resyncing.");
// re register with RM
registerApplicationMaster(this.amRegistrationRequest);
reRegisterApplicationMaster(this.amRegistrationRequest);
return finishApplicationMaster(request);
}
}
@ -363,7 +384,7 @@ public class AMRMClientRelayer extends AbstractService
}
// re-register with RM, then retry allocate recursively
registerApplicationMaster(this.amRegistrationRequest);
reRegisterApplicationMaster(this.amRegistrationRequest);
// Reset responseId after re-register
allocateRequest.setResponseId(0);
allocateResponse = allocate(allocateRequest);

View File

@ -64,6 +64,11 @@ public class TestAMRMClientRelayer {
// Whether this mockRM will throw failover exception upon next heartbeat
// from AM
private boolean failover = false;
// Whether this mockRM will throw application already registered exception
// upon next registerApplicationMaster call
private boolean throwAlreadyRegister = false;
private int responseIdReset = -1;
private List<ResourceRequest> lastAsk;
private List<ContainerId> lastRelease;
@ -74,6 +79,11 @@ public class TestAMRMClientRelayer {
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException {
if (this.throwAlreadyRegister) {
this.throwAlreadyRegister = false;
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE + "appId");
}
return null;
}
@ -118,6 +128,10 @@ public class TestAMRMClientRelayer {
this.failover = true;
}
public void setThrowAlreadyRegister() {
this.throwAlreadyRegister = true;
}
public void setResponseIdReset(int expectedResponseId) {
this.responseIdReset = expectedResponseId;
}
@ -315,4 +329,15 @@ public class TestAMRMClientRelayer {
response = this.relayer.allocate(getAllocateRequest());
Assert.assertEquals(this.responseId + 1, response.getResponseId());
}
@Test
public void testConcurrentReregister() throws YarnException, IOException {
// Set RM restart and failover flag
this.mockAMS.setFailoverFlag();
this.mockAMS.setThrowAlreadyRegister();
relayer.finishApplicationMaster(null);
}
}

View File

@ -184,6 +184,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
*/
private volatile boolean justRecovered;
/** if true, allocate will be no-op, skipping actual processing. */
private volatile boolean finishAMCalled;
/**
* Used to keep track of the container Id and the sub cluster RM that created
* the container, so that we know which sub-cluster to forward later requests
@ -230,6 +233,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
this.amRegistrationRequest = null;
this.amRegistrationResponse = null;
this.justRecovered = false;
this.finishAMCalled = false;
}
/**
@ -576,6 +580,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
+ ". AM should re-register and full re-send pending requests.");
}
if (this.finishAMCalled) {
LOG.warn("FinishApplicationMaster already called by {}, skip heartbeat "
+ "processing and return dummy response" + this.attemptId);
return RECORD_FACTORY.newRecordInstance(AllocateResponse.class);
}
// Check responseId and handle duplicate heartbeat exactly same as RM
synchronized (this.lastAllocateResponseLock) {
LOG.info("Heartbeat from " + this.attemptId + " with responseId "
@ -664,6 +674,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
FinishApplicationMasterRequest request)
throws YarnException, IOException {
this.finishAMCalled = true;
// TODO: consider adding batchFinishApplicationMaster in UAMPoolManager
boolean failedToUnRegister = false;
ExecutorCompletionService<FinishApplicationMasterResponseInfo> compSvc =