From 21aa7f1d8219329c02cd9faa771adc049270ac70 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Wed, 29 Aug 2018 21:30:38 +0000 Subject: [PATCH] YARN-8051: TestRMEmbeddedElector#testCallbackSynchronization is flakey. Contributed by Robert Kanter and Jason Lowe. --- .../TestRMEmbeddedElector.java | 112 ++++++++++++++---- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index 47d18f30b3e..a2b9afd65ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -17,24 +17,31 @@ */ package org.apache.hadoop.yarn.server.resourcemanager; +import com.google.common.base.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.IOException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; +import java.util.concurrent.atomic.AtomicInteger; + import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -49,6 +56,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { private Configuration conf; private AtomicBoolean callbackCalled; + private AtomicInteger transitionToActiveCounter; + private AtomicInteger transitionToStandbyCounter; private enum SyncTestType { ACTIVE, @@ -76,6 +85,8 @@ public void setup() throws IOException { conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); callbackCalled = new AtomicBoolean(false); + transitionToActiveCounter = new AtomicInteger(0); + transitionToStandbyCounter = new AtomicInteger(0); } /** @@ -104,7 +115,7 @@ public void testDeadlockShutdownBecomeActive() throws InterruptedException { */ @Test public void testCallbackSynchronization() - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { testCallbackSynchronization(SyncTestType.ACTIVE); testCallbackSynchronization(SyncTestType.STANDBY); testCallbackSynchronization(SyncTestType.NEUTRAL); @@ -118,9 +129,10 @@ public void testCallbackSynchronization() * @param type the type of test to run * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronization(SyncTestType type) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { AdminService as = mock(AdminService.class); RMContext rc = mock(RMContext.class); ResourceManager rm = mock(ResourceManager.class); @@ -130,6 +142,23 @@ private void testCallbackSynchronization(SyncTestType type) when(rm.getRMContext()).thenReturn(rc); when(rc.getRMAdminService()).thenReturn(as); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + transitionToActiveCounter.incrementAndGet(); + return null; + } + }).when(as).transitionToActive((StateChangeRequestInfo) any()); + transitionToActiveCounter.set(0); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) { + transitionToStandbyCounter.incrementAndGet(); + return null; + } + }).when(as).transitionToStandby((StateChangeRequestInfo) any()); + transitionToStandbyCounter.set(0); + ActiveStandbyElectorBasedElectorService ees = new ActiveStandbyElectorBasedElectorService(rm); ees.init(myConf); @@ -166,16 +195,22 @@ private void testCallbackSynchronization(SyncTestType type) * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.becomeActive(); - Thread.sleep(100); - - verify(as).transitionToActive((StateChangeRequestInfo)any()); - verify(as, never()).transitionToStandby((StateChangeRequestInfo)any()); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return transitionToActiveCounter.get() >= 1; + } + }, 500, 10 * 1000); + verify(as, times(1)).transitionToActive((StateChangeRequestInfo) any()); + verify(as, never()).transitionToStandby((StateChangeRequestInfo) any()); } /** @@ -186,16 +221,21 @@ private void testCallbackSynchronizationActive(AdminService as, * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.becomeStandby(); - Thread.sleep(100); - - verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); - verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return transitionToStandbyCounter.get() >= 1; + } + }, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any()); } /** @@ -205,16 +245,21 @@ private void testCallbackSynchronizationStandby(AdminService as, * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationNeutral(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { ees.enterNeutralMode(); - Thread.sleep(100); - - verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); - verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return transitionToStandbyCounter.get() >= 1; + } + }, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any()); } /** @@ -225,10 +270,11 @@ private void testCallbackSynchronizationNeutral(AdminService as, * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationTimingActive(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -244,8 +290,15 @@ private void testCallbackSynchronizationTimingActive(AdminService as, // going to do, hopefully nothing. Thread.sleep(50); - verify(as).transitionToActive((StateChangeRequestInfo)any()); - verify(as, never()).transitionToStandby((StateChangeRequestInfo)any()); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return transitionToActiveCounter.get() >= 1; + } + }, 500, 10 * 1000); + verify(as, times(1)).transitionToActive((StateChangeRequestInfo) any()); + verify(as, never()).transitionToStandby((StateChangeRequestInfo) any()); } /** @@ -256,10 +309,11 @@ private void testCallbackSynchronizationTimingActive(AdminService as, * @param ees the embedded elector service * @throws IOException if there's an issue transitioning * @throws InterruptedException if interrupted + * @throws TimeoutException if waitFor timeout reached */ private void testCallbackSynchronizationTimingStandby(AdminService as, ActiveStandbyElectorBasedElectorService ees) - throws IOException, InterruptedException { + throws IOException, InterruptedException, TimeoutException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -275,8 +329,14 @@ private void testCallbackSynchronizationTimingStandby(AdminService as, // going to do, hopefully nothing. Thread.sleep(50); - verify(as, atLeast(1)).transitionToStandby((StateChangeRequestInfo)any()); - verify(as, atMost(1)).transitionToStandby((StateChangeRequestInfo)any()); + GenericTestUtils.waitFor( + new Supplier() { + @Override + public Boolean get() { + return transitionToStandbyCounter.get() >= 1; + } + }, 500, 10 * 1000); + verify(as, times(1)).transitionToStandby((StateChangeRequestInfo) any()); } private class MockRMWithElector extends MockRM {