YARN-8051. TestRMEmbeddedElector#testCallbackSynchronization is flaky. (Robert Kanter via Haibo Chen)
(cherry picked from commit 93d47a0ed5
)
Conflicts:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
This commit is contained in:
parent
246086984e
commit
e4282c077b
|
@ -30,13 +30,16 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.atLeast;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
import static org.mockito.Mockito.atMost;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -51,6 +54,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
|
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
private AtomicBoolean callbackCalled;
|
private AtomicBoolean callbackCalled;
|
||||||
|
private AtomicInteger transitionToActiveCounter;
|
||||||
|
private AtomicInteger transitionToStandbyCounter;
|
||||||
|
|
||||||
private enum SyncTestType {
|
private enum SyncTestType {
|
||||||
ACTIVE,
|
ACTIVE,
|
||||||
|
@ -78,6 +83,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L);
|
||||||
|
|
||||||
callbackCalled = new AtomicBoolean(false);
|
callbackCalled = new AtomicBoolean(false);
|
||||||
|
transitionToActiveCounter = new AtomicInteger(0);
|
||||||
|
transitionToStandbyCounter = new AtomicInteger(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,7 +113,7 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCallbackSynchronization()
|
public void testCallbackSynchronization()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
testCallbackSynchronization(SyncTestType.ACTIVE);
|
testCallbackSynchronization(SyncTestType.ACTIVE);
|
||||||
testCallbackSynchronization(SyncTestType.STANDBY);
|
testCallbackSynchronization(SyncTestType.STANDBY);
|
||||||
testCallbackSynchronization(SyncTestType.NEUTRAL);
|
testCallbackSynchronization(SyncTestType.NEUTRAL);
|
||||||
|
@ -120,9 +127,10 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param type the type of test to run
|
* @param type the type of test to run
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronization(SyncTestType type)
|
private void testCallbackSynchronization(SyncTestType type)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
AdminService as = mock(AdminService.class);
|
AdminService as = mock(AdminService.class);
|
||||||
RMContext rc = mock(RMContext.class);
|
RMContext rc = mock(RMContext.class);
|
||||||
ResourceManager rm = mock(ResourceManager.class);
|
ResourceManager rm = mock(ResourceManager.class);
|
||||||
|
@ -132,6 +140,17 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
when(rm.getRMContext()).thenReturn(rc);
|
when(rm.getRMContext()).thenReturn(rc);
|
||||||
when(rc.getRMAdminService()).thenReturn(as);
|
when(rc.getRMAdminService()).thenReturn(as);
|
||||||
|
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
transitionToActiveCounter.incrementAndGet();
|
||||||
|
return null;
|
||||||
|
}).when(as).transitionToActive(any());
|
||||||
|
transitionToActiveCounter.set(0);
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
transitionToStandbyCounter.incrementAndGet();
|
||||||
|
return null;
|
||||||
|
}).when(as).transitionToStandby(any());
|
||||||
|
transitionToStandbyCounter.set(0);
|
||||||
|
|
||||||
ActiveStandbyElectorBasedElectorService ees =
|
ActiveStandbyElectorBasedElectorService ees =
|
||||||
new ActiveStandbyElectorBasedElectorService(rm);
|
new ActiveStandbyElectorBasedElectorService(rm);
|
||||||
ees.init(myConf);
|
ees.init(myConf);
|
||||||
|
@ -168,15 +187,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param ees the embedded elector service
|
* @param ees the embedded elector service
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronizationActive(AdminService as,
|
private void testCallbackSynchronizationActive(AdminService as,
|
||||||
ActiveStandbyElectorBasedElectorService ees)
|
ActiveStandbyElectorBasedElectorService ees)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
ees.becomeActive();
|
ees.becomeActive();
|
||||||
|
|
||||||
Thread.sleep(100);
|
GenericTestUtils.waitFor(
|
||||||
|
() -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
|
||||||
verify(as).transitionToActive(any());
|
verify(as, times(1)).transitionToActive(any());
|
||||||
verify(as, never()).transitionToStandby(any());
|
verify(as, never()).transitionToStandby(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,16 +208,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param ees the embedded elector service
|
* @param ees the embedded elector service
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronizationStandby(AdminService as,
|
private void testCallbackSynchronizationStandby(AdminService as,
|
||||||
ActiveStandbyElectorBasedElectorService ees)
|
ActiveStandbyElectorBasedElectorService ees)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
ees.becomeStandby();
|
ees.becomeStandby();
|
||||||
|
|
||||||
Thread.sleep(100);
|
GenericTestUtils.waitFor(
|
||||||
|
() -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
|
||||||
verify(as, atLeast(1)).transitionToStandby(any());
|
verify(as, times(1)).transitionToStandby(any());
|
||||||
verify(as, atMost(1)).transitionToStandby(any());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -207,16 +227,16 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param ees the embedded elector service
|
* @param ees the embedded elector service
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronizationNeutral(AdminService as,
|
private void testCallbackSynchronizationNeutral(AdminService as,
|
||||||
ActiveStandbyElectorBasedElectorService ees)
|
ActiveStandbyElectorBasedElectorService ees)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
ees.enterNeutralMode();
|
ees.enterNeutralMode();
|
||||||
|
|
||||||
Thread.sleep(100);
|
GenericTestUtils.waitFor(
|
||||||
|
() -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
|
||||||
verify(as, atLeast(1)).transitionToStandby(any());
|
verify(as, times(1)).transitionToStandby(any());
|
||||||
verify(as, atMost(1)).transitionToStandby(any());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -227,10 +247,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param ees the embedded elector service
|
* @param ees the embedded elector service
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronizationTimingActive(AdminService as,
|
private void testCallbackSynchronizationTimingActive(AdminService as,
|
||||||
ActiveStandbyElectorBasedElectorService ees)
|
ActiveStandbyElectorBasedElectorService ees)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
synchronized (ees.zkDisconnectLock) {
|
synchronized (ees.zkDisconnectLock) {
|
||||||
// Sleep while holding the lock so that the timer thread can't do
|
// Sleep while holding the lock so that the timer thread can't do
|
||||||
// anything when it runs. Sleep until we're pretty sure the timer thread
|
// anything when it runs. Sleep until we're pretty sure the timer thread
|
||||||
|
@ -246,7 +267,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
// going to do, hopefully nothing.
|
// going to do, hopefully nothing.
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
|
|
||||||
verify(as).transitionToActive(any());
|
GenericTestUtils.waitFor(
|
||||||
|
() -> transitionToActiveCounter.get() >= 1, 500, 10 * 1000);
|
||||||
|
verify(as, times(1)).transitionToActive(any());
|
||||||
verify(as, never()).transitionToStandby(any());
|
verify(as, never()).transitionToStandby(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,10 +281,11 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
* @param ees the embedded elector service
|
* @param ees the embedded elector service
|
||||||
* @throws IOException if there's an issue transitioning
|
* @throws IOException if there's an issue transitioning
|
||||||
* @throws InterruptedException if interrupted
|
* @throws InterruptedException if interrupted
|
||||||
|
* @throws TimeoutException if waitFor timeout reached
|
||||||
*/
|
*/
|
||||||
private void testCallbackSynchronizationTimingStandby(AdminService as,
|
private void testCallbackSynchronizationTimingStandby(AdminService as,
|
||||||
ActiveStandbyElectorBasedElectorService ees)
|
ActiveStandbyElectorBasedElectorService ees)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException, TimeoutException {
|
||||||
synchronized (ees.zkDisconnectLock) {
|
synchronized (ees.zkDisconnectLock) {
|
||||||
// Sleep while holding the lock so that the timer thread can't do
|
// Sleep while holding the lock so that the timer thread can't do
|
||||||
// anything when it runs. Sleep until we're pretty sure the timer thread
|
// anything when it runs. Sleep until we're pretty sure the timer thread
|
||||||
|
@ -277,8 +301,9 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
||||||
// going to do, hopefully nothing.
|
// going to do, hopefully nothing.
|
||||||
Thread.sleep(50);
|
Thread.sleep(50);
|
||||||
|
|
||||||
verify(as, atLeast(1)).transitionToStandby(any());
|
GenericTestUtils.waitFor(
|
||||||
verify(as, atMost(1)).transitionToStandby(any());
|
() -> transitionToStandbyCounter.get() >= 1, 500, 10 * 1000);
|
||||||
|
verify(as, times(1)).transitionToStandby(any());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue