Separate publishing from applying cluster states

Companion commit to elastic/elasticsearch#24236

Original commit: elastic/x-pack-elasticsearch@d685478f5d
This commit is contained in:
Yannick Welsch 2017-04-03 17:46:31 +02:00
parent dd3578aaf0
commit c551bcba5c
3 changed files with 26 additions and 76 deletions

View File

@ -313,16 +313,17 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
clusterService.addListener(this); clusterService.addListener(this);
scheduler.start(Collections.emptyList()); scheduler.start(Collections.emptyList());
logger.debug("initializing license state"); logger.debug("initializing license state");
final ClusterState clusterState = clusterService.state(); if (clusterService.lifecycleState() == Lifecycle.State.STARTED) {
if (clusterService.lifecycleState() == Lifecycle.State.STARTED final ClusterState clusterState = clusterService.state();
&& clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false &&
&& clusterState.nodes().getMasterNode() != null) { clusterState.nodes().getMasterNode() != null) {
final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE); final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE);
if (clusterState.getNodes().isLocalNodeElectedMaster() && if (clusterState.getNodes().isLocalNodeElectedMaster() &&
(currentMetaData == null || currentMetaData.getLicense() == null)) { (currentMetaData == null || currentMetaData.getLicense() == null)) {
// triggers a cluster changed event // triggers a cluster changed event
// eventually notifying the current licensee // eventually notifying the current licensee
registerTrialLicense(); registerTrialLicense();
}
} }
} }
} }

View File

@ -5,18 +5,14 @@
*/ */
package org.elasticsearch.xpack.monitoring.action; package org.elasticsearch.xpack.monitoring.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.LocalClusterUpdateTask;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
@ -27,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -49,7 +46,6 @@ import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
@ -100,25 +96,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
clusterSettings.add(MonitoringSettings.EXPORTERS_SETTINGS); clusterSettings.add(MonitoringSettings.EXPORTERS_SETTINGS);
final DiscoveryNode node = new DiscoveryNode("node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), final DiscoveryNode node = new DiscoveryNode("node", buildNewFakeTransportAddress(), emptyMap(), emptySet(),
Version.CURRENT); Version.CURRENT);
clusterService = new ClusterService(Settings.builder().put("cluster.name", clusterService = ClusterServiceUtils.createClusterService(threadPool, node, new ClusterSettings(Settings.EMPTY, clusterSettings));
TransportMonitoringBulkActionTests.class.getName()).build(),
new ClusterSettings(Settings.EMPTY, clusterSettings), threadPool, () -> node);
clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
@Override
public void connectToNodes(DiscoveryNodes discoveryNodes) {
// skip
}
@Override
public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) {
// skip
}
});
clusterService.setClusterStatePublisher((event, ackListener) -> {});
clusterService.setDiscoverySettings(new DiscoverySettings(Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)));
clusterService.start();
transportService = new TransportService(clusterService.getSettings(), transport, threadPool, transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> node, null); TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> node, null);
transportService.start(); transportService.start();
@ -147,31 +125,9 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
expectedException.expect(hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); expectedException.expect(hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]")));
final ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL); final ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL);
final CountDownLatch latch = new CountDownLatch(1); ClusterState currentState = clusterService.state();
ClusterServiceUtils.setState(clusterService,
clusterService.submitStateUpdateTask("add blocks to cluster state", new LocalClusterUpdateTask() { ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build());
@Override
public ClusterTasksResult<LocalClusterUpdateTask> execute(ClusterState currentState) throws Exception {
// make sure we increment versions as listener may depend on it for change
return newState(ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build());
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail("unexpected exception: " + e);
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new ElasticsearchException("unexpected interruption", e);
}
MonitoringBulkRequest request = randomRequest(); MonitoringBulkRequest request = randomRequest();
action.execute(request).get(); action.execute(request).get();

View File

@ -9,13 +9,13 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
@ -45,25 +45,21 @@ import static org.mockito.Mockito.when;
public class PersistentTasksNodeServiceTests extends ESTestCase { public class PersistentTasksNodeServiceTests extends ESTestCase {
private ClusterService createClusterService() { private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests"));
return new ClusterService(Settings.builder().put("cluster.name", "PersistentActionExecutorTests").build(), state.metaData(MetaData.builder().generateClusterUuidIfNeeded());
clusterSettings, null, () -> new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), state.routingTable(RoutingTable.builder().build());
Version.CURRENT));
}
private DiscoveryNodes createTestNodes(int nonLocalNodesCount, Settings settings) {
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder();
nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node")); nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node"));
for (int i = 0; i < nonLocalNodesCount; i++) { for (int i = 0; i < nonLocalNodesCount; i++) {
nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT));
} }
nodes.localNodeId("this_node"); nodes.localNodeId("this_node");
return nodes.build(); state.nodes(nodes);
return state.build();
} }
public void testStartTask() throws Exception { public void testStartTask() throws Exception {
ClusterService clusterService = createClusterService();
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class); @SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@ -81,8 +77,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, new TaskManager(Settings.EMPTY), executor); registry, new TaskManager(Settings.EMPTY), executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
.build();
PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder();
boolean added = false; boolean added = false;
@ -159,7 +154,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
} }
public void testTaskCancellation() { public void testTaskCancellation() {
ClusterService clusterService = createClusterService();
AtomicLong capturedTaskId = new AtomicLong(); AtomicLong capturedTaskId = new AtomicLong();
AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>(); AtomicReference<ActionListener<CancelTasksResponse>> capturedListener = new AtomicReference<>();
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) {
@ -188,8 +182,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase {
PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService,
registry, taskManager, executor); registry, taskManager, executor);
ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY);
.build();
ClusterState newClusterState = state; ClusterState newClusterState = state;
// Allocate first task // Allocate first task