YARN-8110. AMRMProxy recover should catch for all throwable to avoid premature exit. (Botong Huang via Subru).
(cherry picked from commit 00905efab2
)
This commit is contained in:
parent
b232dcab33
commit
b9bad94596
|
@ -261,7 +261,7 @@ public class AMRMProxyService extends CompositeService implements
|
|||
// Create the intercepter pipeline for the AM
|
||||
initializePipeline(attemptId, user, amrmToken, localToken,
|
||||
entry.getValue(), true, amCred);
|
||||
} catch (IOException e) {
|
||||
} catch (Throwable e) {
|
||||
LOG.error("Exception when recovering " + attemptId
|
||||
+ ", removing it from NMStateStore and move on", e);
|
||||
this.nmContext.getNMStateStore().removeAMRMProxyAppContext(attemptId);
|
||||
|
|
|
@ -112,6 +112,11 @@ public abstract class BaseAMRMProxyTest {
|
|||
return this.amrmProxyService;
|
||||
}
|
||||
|
||||
protected Context getNMContext() {
|
||||
Assert.assertNotNull(this.nmContext);
|
||||
return this.nmContext;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
this.conf = createConfiguration();
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService.RequestInterceptorChainWrapper;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredAMRMProxyState;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -633,6 +634,35 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
|
|||
mockRM = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test AMRMProxy restart with application recovery failure.
|
||||
*/
|
||||
@Test
|
||||
public void testAppRecoveryFailure() throws YarnException, Exception {
|
||||
Configuration conf = createConfiguration();
|
||||
// Use the MockRequestInterceptorAcrossRestart instead for the chain
|
||||
conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
|
||||
BadRequestInterceptorAcrossRestart.class.getName());
|
||||
|
||||
mockRM = new MockResourceManagerFacade(new YarnConfiguration(conf), 0);
|
||||
|
||||
createAndStartAMRMProxyService(conf);
|
||||
|
||||
// Create an app entry in NMSS
|
||||
registerApplicationMaster(1);
|
||||
|
||||
RecoveredAMRMProxyState state =
|
||||
getNMContext().getNMStateStore().loadAMRMProxyState();
|
||||
Assert.assertEquals(1, state.getAppContexts().size());
|
||||
|
||||
// AMRMProxy restarts and recover
|
||||
createAndStartAMRMProxyService(conf);
|
||||
|
||||
state = getNMContext().getNMStateStore().loadAMRMProxyState();
|
||||
// The app that failed to recover should have been removed from NMSS
|
||||
Assert.assertEquals(0, state.getAppContexts().size());
|
||||
}
|
||||
|
||||
/**
|
||||
* A mock intercepter implementation that uses the same mockRM instance across
|
||||
* restart.
|
||||
|
@ -672,4 +702,16 @@ public class TestAMRMProxyService extends BaseAMRMProxyTest {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A mock intercepter implementation that throws when recovering.
|
||||
*/
|
||||
public static class BadRequestInterceptorAcrossRestart
|
||||
extends MockRequestInterceptorAcrossRestart {
|
||||
|
||||
@Override
|
||||
public void recover(Map<String, byte[]> recoveredDataMap) {
|
||||
throw new RuntimeException("Kaboom");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue