Allow background cluster state update in tests (#61455)

Today the `CoordinatorTests` run the publication process as a single
atomic action; however in production it appears possible that another
master may be elected, publish its state, then fail, then we win another
election, all in between the time we sampled our previous cluster state
and started to publish the one we first thought of.

This violates the `assertClusterStateConsistency()` assertion that
verifies the cluster state update event matches the states we actually
published and applied.

This commit adjusts the tests to run the publication process more
asynchronously so as to allow time for this behaviour to occur. This
should eventually result in a reproduction of the failure in #61437 that
will let us analyse what's really going on there and help us fix it.
This commit is contained in:
David Turner 2020-08-27 11:11:32 +01:00
parent 25e811ced7
commit 411965d392
3 changed files with 36 additions and 5 deletions

View File

@ -1369,7 +1369,7 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
assertThat(e.getCause().getMessage(), equalTo(BrokenCustom.EXCEPTION_MESSAGE));
failed.set(true);
});
cluster.runFor(DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
cluster.runFor(2 * DEFAULT_DELAY_VARIABILITY + 1, "processing broken task");
assertTrue(failed.get());
cluster.stabilise();

View File

@ -126,7 +126,7 @@ public class FakeThreadPoolMasterService extends MasterService {
assert waitForPublish == false;
waitForPublish = true;
final AckListener ackListener = taskOutputs.createAckListener(threadPool, clusterChangedEvent.state());
clusterStatePublisher.publish(clusterChangedEvent, new ActionListener<Void>() {
final ActionListener<Void> publishListener = new ActionListener<Void>() {
private boolean listenerCalled = false;
@ -157,7 +157,20 @@ public class FakeThreadPoolMasterService extends MasterService {
scheduleNextTaskIfNecessary();
}
}
}, wrapAckListener(ackListener));
};
threadPool.generic().execute(threadPool.getThreadContext().preserveContext(new Runnable() {
@Override
public void run() {
clusterStatePublisher.publish(clusterChangedEvent, publishListener, wrapAckListener(ackListener));
}
@Override
public String toString() {
return "publish change of cluster state from version [" + clusterChangedEvent.previousState().version() + "] in term [" +
clusterChangedEvent.previousState().term() + "] to version [" + clusterChangedEvent.state().version()
+ "] in term [" + clusterChangedEvent.state().term() + "]";
}
}));
}
protected AckListener wrapAckListener(AckListener ackListener) {

View File

@ -36,10 +36,15 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -56,6 +61,11 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
final ThreadContext context = new ThreadContext(Settings.EMPTY);
final ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.getThreadContext()).thenReturn(context);
final ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocationOnMock -> runnableTasks.add((Runnable) invocationOnMock.getArguments()[0])).when(executorService).execute(any());
when(mockThreadPool.generic()).thenReturn(executorService);
FakeThreadPoolMasterService masterService = new FakeThreadPoolMasterService("test_node","test", mockThreadPool, runnableTasks::add);
masterService.setClusterStateSupplier(lastClusterStateRef::get);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
@ -89,7 +99,14 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
assertNull(publishingCallback.get());
assertFalse(firstTaskCompleted.get());
runnableTasks.remove(0).run();
final Runnable scheduleTask = runnableTasks.remove(0);
assertThat(scheduleTask, hasToString("master service scheduling next task"));
scheduleTask.run();
final Runnable publishTask = runnableTasks.remove(0);
assertThat(publishTask, hasToString(containsString("publish change of cluster state")));
publishTask.run();
assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(1));
assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 1));
assertNotNull(publishingCallback.get());
@ -121,7 +138,8 @@ public class FakeThreadPoolMasterServiceTests extends ESTestCase {
assertTrue(firstTaskCompleted.get());
assertThat(runnableTasks.size(), equalTo(1)); // check that new task gets queued
runnableTasks.remove(0).run();
runnableTasks.remove(0).run(); // schedule again
runnableTasks.remove(0).run(); // publish again
assertThat(lastClusterStateRef.get().metadata().indices().size(), equalTo(2));
assertThat(lastClusterStateRef.get().version(), equalTo(firstClusterStateVersion + 2));
assertNotNull(publishingCallback.get());