Separate publishing from applying cluster states

Companion commit to elastic/elasticsearch#24236
This commit is contained in:
Yannick Welsch 2017-04-03 17:46:31 +02:00 committed by Martijn van Groningen
parent a08e2d9e5e
commit 44ea5d6b3e
No known key found for this signature in database
GPG Key ID: AB236F4FCF2AF12A
1 changed files with 10 additions and 17 deletions

View File

@ -23,13 +23,13 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
@ -59,25 +59,21 @@ import static org.mockito.Mockito.when;
public class PersistentTasksNodeServiceTests extends ESTestCase { public class PersistentTasksNodeServiceTests extends ESTestCase {
private ClusterService createClusterService() { private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests"));
return new ClusterService(Settings.builder().put("cluster.name", "PersistentActionExecutorTests").build(), state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
clusterSettings, null, () -> new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), state.routingTable(RoutingTable.builder().build());
Version.CURRENT));
}
private DiscoveryNodes createTestNodes(int nonLocalNodesCount, Settings settings) {
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node")); nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node"));
for (int i = 0; i < nonLocalNodesCount; i++) { for (int i = 0; i < nonLocalNodesCount; i++) {
nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
} }
nodes.localNodeId("this_node"); nodes.localNodeId("this_node");
return nodes.build(); state.nodes(nodes);
return state.build();
} }
public void testStartTask() throws Exception { public void testStartTask() throws Exception {
ClusterService clusterService = createClusterService();
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class); @SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@ -95,8 +91,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, new TaskManager(Settings.EMPTY), executor); registry, new TaskManager(Settings.EMPTY), executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
.build();
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
boolean added = false; boolean added = false;
@ -173,7 +168,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
} }
public void testTaskCancellation() { public void testTaskCancellation() {
ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(); AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
@ -202,8 +196,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, taskManager, executor); registry, taskManager, executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
.build();
ClusterState newClusterState = state; ClusterState newClusterState = state;
// Allocate first task // Allocate first task