diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 140483a265f..9d38149937f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -22,18 +22,22 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -48,6 +52,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { private Configuration conf; private AtomicBoolean callbackCalled; + private AtomicInteger transitionToActiveCounter; + private AtomicInteger transitionToStandbyCounter; private enum SyncTestType { ACTIVE, @@ -75,6 +81,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); callbackCalled = new AtomicBoolean(false); + transitionToActiveCounter = new AtomicInteger(0); + transitionToStandbyCounter = new AtomicInteger(0); } /** @@ -103,7 +111,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { */ @Test public void testCallbackSynchronization() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { testCallbackSynchronization(SyncTestType.ACTIVE); testCallbackSynchronization(SyncTestType.STANDBY); testCallbackSynchronization(SyncTestType.NEUTRAL); @@ -117,9 +125,10 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param type the type of test to run * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronization(SyncTestType type) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); ResourceManager rm = mock(ResourceManager.class); @@ -129,6 +138,17 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); + doAnswer(invocation -> { + transitionToActiveCounter.incrementAndGet(); + return null; + }).when(as).transitionToActive(any()); + transitionToActiveCounter.set(0); + doAnswer(invocation -> { + transitionToStandbyCounter.incrementAndGet(); + return null; + }).when(as).transitionToStandby(any()); + transitionToStandbyCounter.set(0); + ActiveStandbyElectorBasedElectorService ees = new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); @@ -165,15 +185,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.becomeActive(); - Thread.sleep(100); - - verify(as).transitionToActive(any()); + GenericTestUtils.waitFor( + () -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000); + verify(as, times(1)).transitionToActive(any()); verify(as, never()).transitionToStandby(any()); } @@ -185,16 +206,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.becomeStandby(); - Thread.sleep(100); - - verify(as, atLeast(1)).transitionToStandby(any()); - verify(as, atMost(1)).transitionToStandby(any()); + GenericTestUtils.waitFor( + () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby(any()); } /** @@ -204,16 +225,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationNeutral(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.enterNeutralMode(); - Thread.sleep(100); - - verify(as, atLeast(1)).transitionToStandby(any()); - verify(as, atMost(1)).transitionToStandby(any()); + GenericTestUtils.waitFor( + () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby(any()); } /** @@ -224,10 +245,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationTimingActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -243,7 +265,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { // going to do, hopefully nothing. Thread.sleep(50); - verify(as).transitionToActive(any()); + GenericTestUtils.waitFor( + () -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000); + verify(as, times(1)).transitionToActive(any()); verify(as, never()).transitionToStandby(any()); } @@ -255,10 +279,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationTimingStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -274,8 +299,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { // going to do, hopefully nothing. Thread.sleep(50); - verify(as, atLeast(1)).transitionToStandby(any()); - verify(as, atMost(1)).transitionToStandby(any()); + GenericTestUtils.waitFor( + () -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby(any()); } private class MockRMWithElector extends MockRM {