YARN-8051. TestRMEmbeddedElector#testCallbackSynchronization is flaky. (Robert Kanter via Haibo Chen)

This commit is contained in:
Haibo Chen 2018-04-03 07:58:21 -07:00
parent 2be64eb201
commit 93d47a0ed5
1 changed files with 49 additions and 23 deletions

View File

@ -22,18 +22,22 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -48,6 +52,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
private Configuration conf; private Configuration conf;
private AtomicBoolean callbackCalled; private AtomicBoolean callbackCalled;
private AtomicInteger transitionToActiveCounter;
private AtomicInteger transitionToStandbyCounter;
private enum SyncTestType { private enum SyncTestType {
ACTIVE, ACTIVE,
@ -75,6 +81,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
callbackCalled = new AtomicBoolean(false); callbackCalled = new AtomicBoolean(false);
transitionToActiveCounter = new AtomicInteger(0);
transitionToStandbyCounter = new AtomicInteger(0);
} }
/** /**
@ -103,7 +111,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
*/ */
@Test @Test
public void testCallbackSynchronization() public void testCallbackSynchronization()
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
testCallbackSynchronization(SyncTestType.ACTIVE); testCallbackSynchronization(SyncTestType.ACTIVE);
testCallbackSynchronization(SyncTestType.STANDBY); testCallbackSynchronization(SyncTestType.STANDBY);
testCallbackSynchronization(SyncTestType.NEUTRAL); testCallbackSynchronization(SyncTestType.NEUTRAL);
@ -117,9 +125,10 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param type the type of test to run * @param type the type of test to run
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronization(SyncTestType type) private void testCallbackSynchronization(SyncTestType type)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
AdminService as = mock(AdminService.class); AdminService as = mock(AdminService.class);
RMContext rc = mock(RMContext.class); RMContext rc = mock(RMContext.class);
ResourceManager rm = mock(ResourceManager.class); ResourceManager rm = mock(ResourceManager.class);
@ -129,6 +138,17 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
when(rm.getRMContext()).thenReturn(rc); when(rm.getRMContext()).thenReturn(rc);
when(rc.getRMAdminService()).thenReturn(as); when(rc.getRMAdminService()).thenReturn(as);
doAnswer(invocation -> {
transitionToActiveCounter.incrementAndGet();
return null;
}).when(as).transitionToActive(any());
transitionToActiveCounter.set(0);
doAnswer(invocation -> {
transitionToStandbyCounter.incrementAndGet();
return null;
}).when(as).transitionToStandby(any());
transitionToStandbyCounter.set(0);
ActiveStandbyElectorBasedElectorService ees = ActiveStandbyElectorBasedElectorService ees =
new ActiveStandbyElectorBasedElectorService(rm); new ActiveStandbyElectorBasedElectorService(rm);
ees.init(myConf); ees.init(myConf);
@ -165,15 +185,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param ees the embedded elector service * @param ees the embedded elector service
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronizationActive(AdminService as, private void testCallbackSynchronizationActive(AdminService as,
ActiveStandbyElectorBasedElectorService ees) ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
ees.becomeActive(); ees.becomeActive();
Thread.sleep(100); GenericTestUtils.waitFor(
() -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
verify(as).transitionToActive(any()); verify(as, times(1)).transitionToActive(any());
verify(as, never()).transitionToStandby(any()); verify(as, never()).transitionToStandby(any());
} }
@ -185,16 +206,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param ees the embedded elector service * @param ees the embedded elector service
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronizationStandby(AdminService as, private void testCallbackSynchronizationStandby(AdminService as,
ActiveStandbyElectorBasedElectorService ees) ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
ees.becomeStandby(); ees.becomeStandby();
Thread.sleep(100); GenericTestUtils.waitFor(
() -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
verify(as, atLeast(1)).transitionToStandby(any()); verify(as, times(1)).transitionToStandby(any());
verify(as, atMost(1)).transitionToStandby(any());
} }
/** /**
@ -204,16 +225,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param ees the embedded elector service * @param ees the embedded elector service
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronizationNeutral(AdminService as, private void testCallbackSynchronizationNeutral(AdminService as,
ActiveStandbyElectorBasedElectorService ees) ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
ees.enterNeutralMode(); ees.enterNeutralMode();
Thread.sleep(100); GenericTestUtils.waitFor(
() -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
verify(as, atLeast(1)).transitionToStandby(any()); verify(as, times(1)).transitionToStandby(any());
verify(as, atMost(1)).transitionToStandby(any());
} }
/** /**
@ -224,10 +245,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param ees the embedded elector service * @param ees the embedded elector service
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronizationTimingActive(AdminService as, private void testCallbackSynchronizationTimingActive(AdminService as,
ActiveStandbyElectorBasedElectorService ees) ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
synchronized (ees.zkDisconnectLock) { synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do // Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread // anything when it runs. Sleep until we're pretty sure the timer thread
@ -243,7 +265,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
// going to do, hopefully nothing. // going to do, hopefully nothing.
Thread.sleep(50); Thread.sleep(50);
verify(as).transitionToActive(any()); GenericTestUtils.waitFor(
() -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
verify(as, times(1)).transitionToActive(any());
verify(as, never()).transitionToStandby(any()); verify(as, never()).transitionToStandby(any());
} }
@ -255,10 +279,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @param ees the embedded elector service * @param ees the embedded elector service
* @throws IOException if there's an issue transitioning * @throws IOException if there's an issue transitioning
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
* @throws TimeoutException if waitFor timeout reached
*/ */
private void testCallbackSynchronizationTimingStandby(AdminService as, private void testCallbackSynchronizationTimingStandby(AdminService as,
ActiveStandbyElectorBasedElectorService ees) ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException { throws IOException, InterruptedException, TimeoutException {
synchronized (ees.zkDisconnectLock) { synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do // Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread // anything when it runs. Sleep until we're pretty sure the timer thread
@ -274,8 +299,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
// going to do, hopefully nothing. // going to do, hopefully nothing.
Thread.sleep(50); Thread.sleep(50);
verify(as, atLeast(1)).transitionToStandby(any()); GenericTestUtils.waitFor(
verify(as, atMost(1)).transitionToStandby(any()); () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
verify(as, times(1)).transitionToStandby(any());
} }
private class MockRMWithElector extends MockRM { private class MockRMWithElector extends MockRM {