simplified PublishClusterStateActionTests infra

This commit is contained in:
Boaz Leskes 2015-08-20 15:49:35 +02:00
parent 3815a41626
commit 81e07e81e0
1 changed files with 93 additions and 170 deletions

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
@ -65,45 +66,57 @@ public class PublishClusterStateActionTests extends ESTestCase {
protected ThreadPool threadPool;
protected Map<String, MockNode> nodes = newHashMap();
public static class MockNode {
public static class MockNode implements PublishClusterStateAction.NewClusterStateListener {
public final DiscoveryNode discoveryNode;
public final MockTransportService service;
public final PublishClusterStateAction action;
public PublishClusterStateAction action;
public final MockDiscoveryNodesProvider nodesProvider;
public final ClusterStateListener listener;
public MockNode(DiscoveryNode discoveryNode, MockTransportService service, PublishClusterStateAction action, MockDiscoveryNodesProvider nodesProvider) {
public volatile ClusterState clusterState;
private final ESLogger logger;
public MockNode(DiscoveryNode discoveryNode, MockTransportService service, MockDiscoveryNodesProvider nodesProvider, @Nullable ClusterStateListener listener, ESLogger logger) {
this.discoveryNode = discoveryNode;
this.service = service;
this.action = action;
this.nodesProvider = nodesProvider;
this.listener = listener;
this.logger = logger;
this.clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder().put(discoveryNode).localNodeId(discoveryNode.id()).build()).build();
}
public void connectTo(DiscoveryNode node) {
service.connectToNode(node);
nodesProvider.addNode(node);
}
@Override
public void onNewClusterState(ClusterState newClusterState, NewStateProcessed newStateProcessed) {
logger.debug("[{}] received version [{}], uuid [{}]", discoveryNode.name(), newClusterState.version(), newClusterState.stateUUID());
if (listener != null) {
ClusterChangedEvent event = new ClusterChangedEvent("", newClusterState, clusterState);
listener.clusterChanged(event);
}
clusterState = newClusterState;
newStateProcessed.onNewClusterStateProcessed();
}
}
public MockNode createMockNode(final String name, Settings settings, Version version) throws Exception {
return createMockNode(name, settings, version, new PublishClusterStateAction.NewClusterStateListener() {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
logger.debug("Node [{}] onNewClusterState version [{}], uuid [{}]", name, clusterState.version(), clusterState.stateUUID());
newStateProcessed.onNewClusterStateProcessed();
}
});
return createMockNode(name, settings, version, null);
}
public MockNode createMockNode(String name, Settings settings, Version version, PublishClusterStateAction.NewClusterStateListener listener) throws Exception {
public MockNode createMockNode(String name, Settings settings, Version version, @Nullable ClusterStateListener listener) throws Exception {
MockTransportService service = buildTransportService(
Settings.builder().put(settings).put("name", name, TransportService.SETTING_TRACE_LOG_INCLUDE, "", TransportService.SETTING_TRACE_LOG_EXCLUDE, "NOTHING").build(),
version
);
DiscoveryNode discoveryNode = new DiscoveryNode(name, name, service.boundAddress().publishAddress(), ImmutableMap.<String, String>of(), version);
MockDiscoveryNodesProvider nodesProvider = new MockDiscoveryNodesProvider(discoveryNode);
PublishClusterStateAction action = buildPublishClusterStateAction(settings, service, nodesProvider, listener);
MockNode node = new MockNode(discoveryNode, service, action, nodesProvider);
MockNode node = new MockNode(discoveryNode, service, nodesProvider, listener, logger);
nodesProvider.addNode(discoveryNode);
node.action = buildPublishClusterStateAction(settings, service, nodesProvider, node);
final CountDownLatch latch = new CountDownLatch(nodes.size() * 2 + 1);
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
@Override
@ -209,11 +222,8 @@ public class PublishClusterStateActionTests extends ESTestCase {
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testSimpleClusterStatePublishing() throws Exception {
MockNewClusterStateListener mockListenerA = new MockNewClusterStateListener();
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, mockListenerA);
MockNewClusterStateListener mockListenerB = new MockNewClusterStateListener();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT, mockListenerB);
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT);
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
@ -223,103 +233,53 @@ public class PublishClusterStateActionTests extends ESTestCase {
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeB.discoveryNode).build();
ClusterState previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
assertThat(clusterState.blocks().global().size(), equalTo(1));
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(1));
// cluster state update - remove block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
assertThat(clusterState.blocks().global().size(), equalTo(0));
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertTrue(nodeB.clusterState.wasReadFromDiff());
// Adding new node - this node should get full cluster state while nodeB should still be getting diffs
MockNewClusterStateListener mockListenerC = new MockNewClusterStateListener();
MockNode nodeC = createMockNode("nodeC", Settings.EMPTY, Version.CURRENT, mockListenerC);
MockNode nodeC = createMockNode("nodeC", Settings.EMPTY, Version.CURRENT);
// cluster state update 3 - register node C
previousClusterState = clusterState;
discoveryNodes = DiscoveryNodes.builder(discoveryNodes).put(nodeC.discoveryNode).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
assertThat(clusterState.blocks().global().size(), equalTo(0));
}
});
mockListenerC.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
// First state
assertFalse(clusterState.wasReadFromDiff());
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
// First state
assertSameStateFromFull(nodeC.clusterState, clusterState);
// cluster state update 4 - update settings
previousClusterState = clusterState;
MetaData metaData = MetaData.builder(clusterState.metaData()).transientSettings(Settings.settingsBuilder().put("foo", "bar").build()).build();
clusterState = ClusterState.builder(clusterState).metaData(metaData).incrementVersion().build();
NewClusterStateExpectation expectation = new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
assertThat(clusterState.blocks().global().size(), equalTo(0));
}
};
mockListenerB.add(expectation);
mockListenerC.add(expectation);
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
assertThat(nodeB.clusterState.blocks().global().size(), equalTo(0));
assertSameStateFromDiff(nodeC.clusterState, clusterState);
assertThat(nodeC.clusterState.blocks().global().size(), equalTo(0));
// cluster state update - skipping one version change - should request full cluster state
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
expectation = new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
};
mockListenerB.add(expectation);
mockListenerC.add(expectation);
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
// cluster state update - skipping one version change - should request full cluster state
previousClusterState = ClusterState.builder(clusterState).incrementVersion().build();
clusterState = ClusterState.builder(clusterState).incrementVersion().build();
expectation = new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
};
mockListenerB.add(expectation);
mockListenerC.add(expectation);
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
assertSameStateFromFull(nodeC.clusterState, clusterState);
assertFalse(nodeC.clusterState.wasReadFromDiff());
// node B becomes the master and sends a version of the cluster state that goes back
discoveryNodes = DiscoveryNodes.builder(discoveryNodes)
@ -329,53 +289,36 @@ public class PublishClusterStateActionTests extends ESTestCase {
.build();
previousClusterState = ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).build();
clusterState = ClusterState.builder(clusterState).nodes(discoveryNodes).incrementVersion().build();
expectation = new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
};
mockListenerA.add(expectation);
mockListenerC.add(expectation);
publishStateDiffAndWait(nodeB.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeA.clusterState, clusterState);
assertSameStateFromFull(nodeC.clusterState, clusterState);
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testUnexpectedDiffPublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new PublishClusterStateAction.NewClusterStateListener() {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
public void clusterChanged(ClusterChangedEvent event) {
fail("Shouldn't send cluster state to myself");
}
});
MockNewClusterStateListener mockListenerB = new MockNewClusterStateListener();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT, mockListenerB);
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromDiff(nodeB.clusterState, clusterState);
}
@Test
@ -383,19 +326,17 @@ public class PublishClusterStateActionTests extends ESTestCase {
public void testDisablingDiffPublishing() throws Exception {
Settings noDiffPublishingSettings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, false).build();
MockNode nodeA = createMockNode("nodeA", noDiffPublishingSettings, Version.CURRENT, new PublishClusterStateAction.NewClusterStateListener() {
MockNode nodeA = createMockNode("nodeA", noDiffPublishingSettings, Version.CURRENT, new ClusterStateListener() {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
public void clusterChanged(ClusterChangedEvent event) {
fail("Shouldn't send cluster state to myself");
}
});
MockNode nodeB = createMockNode("nodeB", noDiffPublishingSettings, Version.CURRENT, new PublishClusterStateAction.NewClusterStateListener() {
MockNode nodeB = createMockNode("nodeB", noDiffPublishingSettings, Version.CURRENT, new ClusterStateListener() {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
logger.debug("Got cluster state update, version [{}], guid [{}], from diff [{}]", clusterState.version(), clusterState.stateUUID(), clusterState.wasReadFromDiff());
assertFalse(clusterState.wasReadFromDiff());
newStateProcessed.onNewClusterStateProcessed();
public void clusterChanged(ClusterChangedEvent event) {
assertFalse(event.state().wasReadFromDiff());
}
});
@ -418,33 +359,28 @@ public class PublishClusterStateActionTests extends ESTestCase {
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
/**
* Test concurrent publishing works correctly (although not strictly required, it's a good testamne
*/
public void testSimultaneousClusterStatePublishing() throws Exception {
int numberOfNodes = randomIntBetween(2, 10);
int numberOfIterations = randomIntBetween(50, 200);
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_TIMEOUT, "100ms").put(DiscoverySettings.PUBLISH_DIFF_ENABLE, true).build();
Settings settings = Settings.builder().put(DiscoverySettings.PUBLISH_DIFF_ENABLE, randomBoolean()).build();
MockNode[] nodes = new MockNode[numberOfNodes];
DiscoveryNodes.Builder discoveryNodesBuilder = DiscoveryNodes.builder();
for (int i = 0; i < nodes.length; i++) {
final String name = "node" + i;
nodes[i] = createMockNode(name, settings, Version.CURRENT, new PublishClusterStateAction.NewClusterStateListener() {
nodes[i] = createMockNode(name, settings, Version.CURRENT, new ClusterStateListener() {
@Override
public synchronized void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
assertProperMetaDataForVersion(clusterState.metaData(), clusterState.version());
if (randomInt(10) < 2) {
// Cause timeouts from time to time
try {
Thread.sleep(randomInt(110));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
newStateProcessed.onNewClusterStateProcessed();
public void clusterChanged(ClusterChangedEvent event) {
assertProperMetaDataForVersion(event.state().metaData(), event.state().version());
}
});
discoveryNodesBuilder.put(nodes[i].discoveryNode);
}
AssertingAckListener[] listeners = new AssertingAckListener[numberOfIterations];
discoveryNodesBuilder.localNodeId(nodes[0].discoveryNode.id());
DiscoveryNodes discoveryNodes = discoveryNodesBuilder.build();
MetaData metaData = MetaData.EMPTY_META_DATA;
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).build();
@ -459,43 +395,40 @@ public class PublishClusterStateActionTests extends ESTestCase {
for (int i = 0; i < numberOfIterations; i++) {
listeners[i].await(1, TimeUnit.SECONDS);
}
// fake node[0] - it is the master
nodes[0].clusterState = clusterState;
for (MockNode node : nodes) {
assertThat(node.discoveryNode + " misses a cluster state", node.clusterState, notNullValue());
assertThat(node.discoveryNode + " unexpected cluster state: " + node.clusterState, node.clusterState.version(), equalTo(clusterState.version()));
assertThat(node.clusterState.nodes().localNode(), equalTo(node.discoveryNode));
}
}
@Test
@TestLogging("cluster:DEBUG,discovery.zen.publish:DEBUG")
public void testSerializationFailureDuringDiffPublishing() throws Exception {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new PublishClusterStateAction.NewClusterStateListener() {
MockNode nodeA = createMockNode("nodeA", Settings.EMPTY, Version.CURRENT, new ClusterStateListener() {
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
public void clusterChanged(ClusterChangedEvent event) {
fail("Shouldn't send cluster state to myself");
}
});
MockNewClusterStateListener mockListenerB = new MockNewClusterStateListener();
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT, mockListenerB);
MockNode nodeB = createMockNode("nodeB", Settings.EMPTY, Version.CURRENT);
// Initial cluster state with both states - the second node still shouldn't get diff even though it's present in the previous cluster state
DiscoveryNodes discoveryNodes = DiscoveryNodes.builder().put(nodeA.discoveryNode).put(nodeB.discoveryNode).localNodeId(nodeA.discoveryNode.id()).build();
ClusterState previousClusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(discoveryNodes).build();
ClusterState clusterState = ClusterState.builder(previousClusterState).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertFalse(clusterState.wasReadFromDiff());
}
});
publishStateDiffAndWait(nodeA.action, clusterState, previousClusterState);
assertSameStateFromFull(nodeB.clusterState, clusterState);
// cluster state update - add block
previousClusterState = clusterState;
clusterState = ClusterState.builder(clusterState).blocks(ClusterBlocks.builder().addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK)).incrementVersion().build();
mockListenerB.add(new NewClusterStateExpectation() {
@Override
public void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed) {
assertTrue(clusterState.wasReadFromDiff());
}
});
ClusterState unserializableClusterState = new ClusterState(clusterState.version(), clusterState.stateUUID(), clusterState) {
@Override
@ -589,31 +522,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
public interface NewClusterStateExpectation {
void check(ClusterState clusterState, PublishClusterStateAction.NewClusterStateListener.NewStateProcessed newStateProcessed);
}
public static class MockNewClusterStateListener implements PublishClusterStateAction.NewClusterStateListener {
CopyOnWriteArrayList<NewClusterStateExpectation> expectations = new CopyOnWriteArrayList();
@Override
public void onNewClusterState(ClusterState clusterState, NewStateProcessed newStateProcessed) {
final NewClusterStateExpectation expectation;
try {
expectation = expectations.remove(0);
} catch (ArrayIndexOutOfBoundsException ex) {
fail("Unexpected cluster state update " + clusterState.prettyPrint());
return;
}
expectation.check(clusterState, newStateProcessed);
newStateProcessed.onNewClusterStateProcessed();
}
public void add(NewClusterStateExpectation expectation) {
expectations.add(expectation);
}
}
public static class DelegatingClusterState extends ClusterState {
public DelegatingClusterState(ClusterState clusterState) {
@ -623,4 +531,19 @@ public class PublishClusterStateActionTests extends ESTestCase {
}
void assertSameState(ClusterState actual, ClusterState expected) {
assertThat(actual, notNullValue());
assertThat("\n--> actual ClusterState: " + actual.prettyPrint() + "\n--> expected ClusterState:" + expected.prettyPrint(), actual.stateUUID(), equalTo(expected.stateUUID()));
}
void assertSameStateFromDiff(ClusterState actual, ClusterState expected) {
assertSameState(actual, expected);
assertTrue(actual.wasReadFromDiff());
}
void assertSameStateFromFull(ClusterState actual, ClusterState expected) {
assertSameState(actual, expected);
assertFalse(actual.wasReadFromDiff());
}
}