YARN-6955. Handle concurrent register AM requests in FederationInterceptor. (Botong Huang via Subru).

This commit is contained in:
Subru Krishnan 2017-08-07 16:58:29 -07:00
parent bc206806da
commit c61f2c4198
4 changed files with 110 additions and 43 deletions

View File

@ -594,11 +594,9 @@
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" /> <Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match> </Match>
<!-- Ignore false alert for RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE -->
<Match> <Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" /> <Class name="org.apache.hadoop.yarn.server.nodemanager.amrmproxy.FederationInterceptor" />
<Method name="registerApplicationMaster" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE" />
</Match> </Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -246,6 +246,16 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
shouldReRegisterNext = false; shouldReRegisterNext = false;
synchronized (applicationContainerIdMap) {
if (applicationContainerIdMap.containsKey(amrmToken)) {
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
}
// Keep track of the containers that are returned to this application
applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
}
// Make sure we wait for certain test cases last in the method
synchronized (syncObj) { synchronized (syncObj) {
syncObj.notifyAll(); syncObj.notifyAll();
// We reuse the port number to indicate whether the unit test want us to // We reuse the port number to indicate whether the unit test want us to
@ -261,14 +271,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
} }
} }
synchronized (applicationContainerIdMap) {
if (applicationContainerIdMap.containsKey(amrmToken)) {
throw new InvalidApplicationMasterRequestException(
AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE);
}
// Keep track of the containers that are returned to this application
applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
}
return RegisterApplicationMasterResponse.newInstance(null, null, null, null, return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
null, request.getHost(), null); null, request.getHost(), null);
} }

View File

@ -208,21 +208,24 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* requests from AM because of timeout between AM and AMRMProxy, which is * requests from AM because of timeout between AM and AMRMProxy, which is
* shorter than the timeout + failOver between FederationInterceptor * shorter than the timeout + failOver between FederationInterceptor
* (AMRMProxy) and RM. * (AMRMProxy) and RM.
*
* For the same reason, this method needs to be synchronized.
*/ */
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public synchronized RegisterApplicationMasterResponse
RegisterApplicationMasterRequest request) registerApplicationMaster(RegisterApplicationMasterRequest request)
throws YarnException, IOException { throws YarnException, IOException {
// If AM is calling with a different request, complain // If AM is calling with a different request, complain
if (this.amRegistrationRequest != null if (this.amRegistrationRequest != null) {
&& !this.amRegistrationRequest.equals(request)) { if (!this.amRegistrationRequest.equals(request)) {
throw new YarnException("A different request body recieved. AM should" throw new YarnException("AM should not call "
+ " not call registerApplicationMaster with different request body"); + "registerApplicationMaster with a different request body");
} }
} else {
// Save the registration request. This will be used for registering with // Save the registration request. This will be used for registering with
// secondary sub-clusters using UAMs, as well as re-register later // secondary sub-clusters using UAMs, as well as re-register later
this.amRegistrationRequest = request; this.amRegistrationRequest = request;
}
/* /*
* Present to AM as if we are the RM that never fails over. When actual RM * Present to AM as if we are the RM that never fails over. When actual RM
@ -245,22 +248,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor {
* is running and will breaks the elasticity feature. The registration with * is running and will breaks the elasticity feature. The registration with
* the other sub-cluster RM will be done lazily as needed later. * the other sub-cluster RM will be done lazily as needed later.
*/ */
try {
this.amRegistrationResponse = this.amRegistrationResponse =
this.homeRM.registerApplicationMaster(request); this.homeRM.registerApplicationMaster(request);
} catch (InvalidApplicationMasterRequestException e) {
if (e.getMessage()
.contains(AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE)) {
// Some other register thread might have succeeded in the meantime
if (this.amRegistrationResponse != null) {
LOG.info("Other concurrent thread registered successfully, "
+ "simply return the same success register response");
return this.amRegistrationResponse;
}
}
// This is a real issue, throw back to AM
throw e;
}
// the queue this application belongs will be used for getting // the queue this application belongs will be used for getting
// AMRMProxy policy from state store. // AMRMProxy policy from state store.

View File

@ -21,6 +21,11 @@ package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -36,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager; import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore; import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest; import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
@ -234,7 +240,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq = RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId); registerReq.setRpcPort(0);
registerReq.setTrackingUrl(""); registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
@ -298,7 +304,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq = RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId); registerReq.setRpcPort(0);
registerReq.setTrackingUrl(""); registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
@ -338,6 +344,78 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
Assert.assertEquals(true, finshResponse.getIsUnregistered()); Assert.assertEquals(true, finshResponse.getIsUnregistered());
} }
/*
* Test concurrent register threads. This is possible because the timeout
* between AM and AMRMProxy is shorter than the timeout + failOver between
* FederationInterceptor (AMRMProxy) and RM. When first call is blocked due to
* RM failover and AM timeout, it will call us resulting in a second register
* thread.
*/
@Test(timeout = 5000)
public void testConcurrentRegister()
throws InterruptedException, ExecutionException {
ExecutorService threadpool = Executors.newCachedThreadPool();
ExecutorCompletionService<RegisterApplicationMasterResponse> compSvc =
new ExecutorCompletionService<>(threadpool);
Object syncObj = MockResourceManagerFacade.getSyncObj();
// Two register threads
synchronized (syncObj) {
// Make sure first thread will block within RM, before the second thread
// starts
LOG.info("Starting first register thread");
compSvc.submit(new ConcurrentRegisterAMCallable());
try {
LOG.info("Test main starts waiting for the first thread to block");
syncObj.wait();
LOG.info("Test main wait finished");
} catch (Exception e) {
LOG.info("Test main wait interrupted", e);
}
}
// The second thread will get already registered exception from RM.
LOG.info("Starting second register thread");
compSvc.submit(new ConcurrentRegisterAMCallable());
// Notify the first register thread to return
LOG.info("Let first blocked register thread move on");
synchronized (syncObj) {
syncObj.notifyAll();
}
// Both thread should return without exception
RegisterApplicationMasterResponse response = compSvc.take().get();
Assert.assertNotNull(response);
response = compSvc.take().get();
Assert.assertNotNull(response);
threadpool.shutdown();
}
/**
* A callable that calls registerAM to RM with blocking.
*/
public class ConcurrentRegisterAMCallable
implements Callable<RegisterApplicationMasterResponse> {
@Override
public RegisterApplicationMasterResponse call() throws Exception {
RegisterApplicationMasterResponse response = null;
try {
// Use port number 1001 to let mock RM block in the register call
response = interceptor.registerApplicationMaster(
RegisterApplicationMasterRequest.newInstance(null, 1001, null));
} catch (Exception e) {
LOG.info("Register thread exception", e);
response = null;
}
return response;
}
}
@Test @Test
public void testRequestInterceptorChainCreation() throws Exception { public void testRequestInterceptorChainCreation() throws Exception {
RequestInterceptor root = RequestInterceptor root =
@ -381,7 +459,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq = RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId); registerReq.setRpcPort(0);
registerReq.setTrackingUrl(""); registerReq.setTrackingUrl("");
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
@ -397,7 +475,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
RegisterApplicationMasterRequest registerReq = RegisterApplicationMasterRequest registerReq =
Records.newRecord(RegisterApplicationMasterRequest.class); Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId); registerReq.setRpcPort(0);
registerReq.setTrackingUrl(""); registerReq.setTrackingUrl("");
RegisterApplicationMasterResponse registerResponse = RegisterApplicationMasterResponse registerResponse =
@ -407,7 +485,7 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest {
// Register the application second time with a different request obj // Register the application second time with a different request obj
registerReq = Records.newRecord(RegisterApplicationMasterRequest.class); registerReq = Records.newRecord(RegisterApplicationMasterRequest.class);
registerReq.setHost(Integer.toString(testAppId)); registerReq.setHost(Integer.toString(testAppId));
registerReq.setRpcPort(testAppId); registerReq.setRpcPort(0);
registerReq.setTrackingUrl("different"); registerReq.setTrackingUrl("different");
try { try {
registerResponse = interceptor.registerApplicationMaster(registerReq); registerResponse = interceptor.registerApplicationMaster(registerReq);