diff --git a/plugin/src/main/java/org/elasticsearch/license/LicenseService.java b/plugin/src/main/java/org/elasticsearch/license/LicenseService.java index faf830054f2..6db60607956 100644 --- a/plugin/src/main/java/org/elasticsearch/license/LicenseService.java +++ b/plugin/src/main/java/org/elasticsearch/license/LicenseService.java @@ -313,16 +313,17 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste clusterService.addListener(this); scheduler.start(Collections.emptyList()); logger.debug("initializing license state"); - final ClusterState clusterState = clusterService.state(); - if (clusterService.lifecycleState() == Lifecycle.State.STARTED - && clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false - && clusterState.nodes().getMasterNode() != null) { - final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE); - if (clusterState.getNodes().isLocalNodeElectedMaster() && - (currentMetaData == null || currentMetaData.getLicense() == null)) { - // triggers a cluster changed event - // eventually notifying the current licensee - registerTrialLicense(); + if (clusterService.lifecycleState() == Lifecycle.State.STARTED) { + final ClusterState clusterState = clusterService.state(); + if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false && + clusterState.nodes().getMasterNode() != null) { + final LicensesMetaData currentMetaData = clusterState.metaData().custom(LicensesMetaData.TYPE); + if (clusterState.getNodes().isLocalNodeElectedMaster() && + (currentMetaData == null || currentMetaData.getLicense() == null)) { + // triggers a cluster changed event + // eventually notifying the current licensee + registerTrialLicense(); + } } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java index 73c724169c5..971a274dc15 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/monitoring/action/TransportMonitoringBulkActionTests.java @@ -5,18 +5,14 @@ */ package org.elasticsearch.xpack.monitoring.action; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.LocalClusterUpdateTask; -import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.ClusterSettings; @@ -27,6 +23,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.threadpool.TestThreadPool; @@ -49,7 +46,6 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -100,25 +96,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { clusterSettings.add(MonitoringSettings.EXPORTERS_SETTINGS); final DiscoveryNode node = new DiscoveryNode("node", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); - clusterService = new ClusterService(Settings.builder().put("cluster.name", - TransportMonitoringBulkActionTests.class.getName()).build(), - new ClusterSettings(Settings.EMPTY, clusterSettings), threadPool, () -> node); - clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) { - @Override - public void connectToNodes(DiscoveryNodes discoveryNodes) { - // skip - } - - @Override - public void disconnectFromNodesExcept(DiscoveryNodes nodesToKeep) { - // skip - } - }); - clusterService.setClusterStatePublisher((event, ackListener) -> {}); - clusterService.setDiscoverySettings(new DiscoverySettings(Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); - clusterService.start(); - + clusterService = ClusterServiceUtils.createClusterService(threadPool, node, new ClusterSettings(Settings.EMPTY, clusterSettings)); transportService = new TransportService(clusterService.getSettings(), transport, threadPool, TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> node, null); transportService.start(); @@ -147,31 +125,9 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { expectedException.expect(hasToString(containsString("ClusterBlockException[blocked by: [SERVICE_UNAVAILABLE/2/no master]"))); final ClusterBlocks.Builder block = ClusterBlocks.builder().addGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ALL); - final CountDownLatch latch = new CountDownLatch(1); - - clusterService.submitStateUpdateTask("add blocks to cluster state", new LocalClusterUpdateTask() { - @Override - public ClusterTasksResult execute(ClusterState currentState) throws Exception { - // make sure we increment versions as listener may depend on it for change - return newState(ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build()); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - latch.countDown(); - } - - @Override - public void onFailure(String source, Exception e) { - fail("unexpected exception: " + e); - } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - throw new ElasticsearchException("unexpected interruption", e); - } + ClusterState currentState = clusterService.state(); + ClusterServiceUtils.setState(clusterService, + ClusterState.builder(currentState).blocks(block).version(currentState.version() + 1).build()); MonitoringBulkRequest request = randomRequest(); action.execute(request).get(); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java index 8f9c9680bbd..8dcd23f193f 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/persistent/PersistentTasksNodeServiceTests.java @@ -9,13 +9,13 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.UUIDs; -import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskId; @@ -45,25 +45,21 @@ import static org.mockito.Mockito.when; public class PersistentTasksNodeServiceTests extends ESTestCase { - private ClusterService createClusterService() { - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - return new ClusterService(Settings.builder().put("cluster.name", "PersistentActionExecutorTests").build(), - clusterSettings, null, () -> new DiscoveryNode(UUIDs.randomBase64UUID(), buildNewFakeTransportAddress(), - Version.CURRENT)); - } - - private DiscoveryNodes createTestNodes(int nonLocalNodesCount, Settings settings) { + private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings settings) { + ClusterState.Builder state = ClusterState.builder(new ClusterName("PersistentActionExecutorTests")); + state.metaData(MetaData.builder().generateClusterUuidIfNeeded()); + state.routingTable(RoutingTable.builder().build()); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(); nodes.add(DiscoveryNode.createLocal(settings, buildNewFakeTransportAddress(), "this_node")); for (int i = 0; i < nonLocalNodesCount; i++) { nodes.add(new DiscoveryNode("other_node_" + i, buildNewFakeTransportAddress(), Version.CURRENT)); } nodes.localNodeId("this_node"); - return nodes.build(); + state.nodes(nodes); + return state.build(); } public void testStartTask() throws Exception { - ClusterService clusterService = createClusterService(); PersistentTasksService persistentTasksService = mock(PersistentTasksService.class); @SuppressWarnings("unchecked") PersistentTasksExecutor action = mock(PersistentTasksExecutor.class); when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME); @@ -81,8 +77,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, registry, new TaskManager(Settings.EMPTY), executor); - ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) - .build(); + ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY); PersistentTasksCustomMetaData.Builder tasks = PersistentTasksCustomMetaData.builder(); boolean added = false; @@ -159,7 +154,6 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { } public void testTaskCancellation() { - ClusterService clusterService = createClusterService(); AtomicLong capturedTaskId = new AtomicLong(); AtomicReference> capturedListener = new AtomicReference<>(); PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, null, null, null) { @@ -188,8 +182,7 @@ public class PersistentTasksNodeServiceTests extends ESTestCase { PersistentTasksNodeService coordinator = new PersistentTasksNodeService(Settings.EMPTY, persistentTasksService, registry, taskManager, executor); - ClusterState state = ClusterState.builder(clusterService.state()).nodes(createTestNodes(nonLocalNodesCount, Settings.EMPTY)) - .build(); + ClusterState state = createInitialClusterState(nonLocalNodesCount, Settings.EMPTY); ClusterState newClusterState = state; // Allocate first task