From 411965d392c179b1ce288b19a7233b651c2ee912 Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 27 Aug 2020 11:11:32 +0100 Subject: [PATCH] Allow background cluster state update in tests (#61455) Today the `CoordinatorTests` run the publication process as a single atomic action; however in production it appears possible that another master may be elected, publish its state, then fail, then we win another election, all in between the time we sampled our previous cluster state and started to publish the one we first thought of. This violates the `assertClusterStateConsistency()` assertion that verifies the cluster state update event matches the states we actually published and applied. This commit adjusts the tests to run the publication process more asynchronously so as to allow time for this behaviour to occur. This should eventually result in a reproduction of the failure in #61437 that will let us analyse what's really going on there and help us fix it. --- .../coordination/CoordinatorTests.java | 2 +- .../service/FakeThreadPoolMasterService.java | 17 ++++++++++++-- .../FakeThreadPoolMasterServiceTests.java | 22 +++++++++++++++++-- 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index d5522c90d88..e18485f0c9c 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -1369,7 +1369,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase { assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE)); failed.set(true); }); - cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task"); + cluster.runFor(2 * DEFAULT_DELAY_VARIABILITY + 1, "processing broken task"); assertTrue(failed.get()); cluster.stabilise(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java index dfa11d8beb0..d4fde097fb0 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterService.java @@ -126,7 +126,7 @@ public class FakeThreadPoolMasterService extends MasterService { assert waitForPublish == false; waitForPublish = true; final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state()); - clusterStatePublisher.publish(clusterChangedEvent, new ActionListener() { + final ActionListener publishListener = new ActionListener() { private boolean listenerCalled = false; @@ -157,7 +157,20 @@ public class FakeThreadPoolMasterService extends MasterService { scheduleNextTaskIfNecessary(); } } - }, wrapAckListener(ackListener)); + }; + threadPool.generic().execute(threadPool.getThreadContext().preserveContext(new Runnable() { + @Override + public void run() { + clusterStatePublisher.publish(clusterChangedEvent, publishListener, wrapAckListener(ackListener)); + } + + @Override + public String toString() { + return "publish change of cluster state from version [" + clusterChangedEvent.previousState().version() + "] in term [" + + clusterChangedEvent.previousState().term() + "] to version [" + clusterChangedEvent.state().version() + + "] in term [" + clusterChangedEvent.state().term() + "]"; + } + })); } protected AckListener wrapAckListener(AckListener ackListener) { diff --git a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java index 3fd7b8bb981..e297be7be0c 100644 --- a/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/cluster/service/FakeThreadPoolMasterServiceTests.java @@ -36,10 +36,15 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,6 +61,11 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase { final ThreadContext context = new ThreadContext(Settings.EMPTY); final ThreadPool mockThreadPool = mock(ThreadPool.class); when(mockThreadPool.getThreadContext()).thenReturn(context); + + final ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any()); + when(mockThreadPool.generic()).thenReturn(executorService); + FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add); masterService.setClusterStateSupplier(lastClusterStateRef::get); masterService.setClusterStatePublisher((event, publishListener, ackListener) -> { @@ -89,7 +99,14 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase { assertNull(publishingCallback.get()); assertFalse(firstTaskCompleted.get()); - runnableTasks.remove(0).run(); + final Runnable scheduleTask = runnableTasks.remove(0); + assertThat(scheduleTask, hasToString("master service scheduling next task")); + scheduleTask.run(); + + final Runnable publishTask = runnableTasks.remove(0); + assertThat(publishTask, hasToString(containsString("publish change of cluster state"))); + publishTask.run(); + assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(1)); assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 1)); assertNotNull(publishingCallback.get()); @@ -121,7 +138,8 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase { assertTrue(firstTaskCompleted.get()); assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued - runnableTasks.remove(0).run(); + runnableTasks.remove(0).run(); // schedule again + runnableTasks.remove(0).run(); // publish again assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(2)); assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2)); assertNotNull(publishingCallback.get());