diff --git a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java index a142cfe4984..cf1de3ab9c6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleService.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; @@ -33,6 +34,10 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener { + // this is the required index.format setting for watcher to start up at all + // this index setting is set by the upgrade API or automatically when a 6.0 index template is created + private static final int EXPECTED_INDEX_FORMAT_VERSION = 6; + private final WatcherService watcherService; private final ClusterService clusterService; private final ExecutorService executor; @@ -92,8 +97,9 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste } } - /* - * TODO possible optimization + /** + * @param event The event of containing the new cluster state + * * stop certain parts of watcher, when there are no watcher indices on this node by checking the shardrouting * note that this is not easily possible, because of the execute watch api, that needs to be able to execute anywhere! * this means, only certain components can be stopped @@ -121,6 +127,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste DiscoveryNode localNode = event.state().nodes().getLocalNode(); RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); + // no watcher index, time to pause, if we currently have shards here if (watcherIndexMetaData == null) { if (previousAllocationIds.get().isEmpty() == false) { @@ -151,7 +158,19 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids")); } } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { - executor.execute(() -> start(event.state(), false)); + IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); + IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME, + event.state().metaData()); + String indexFormatSetting = IndexMetaData.INDEX_FORMAT_SETTING.getKey(); + boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null || + watcherIndexMetaData.getSettings().getAsInt(indexFormatSetting, 0) == EXPECTED_INDEX_FORMAT_VERSION; + boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null || + triggeredWatchesIndexMetaData.getSettings().getAsInt(indexFormatSetting, 0) == EXPECTED_INDEX_FORMAT_VERSION; + if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) { + executor.execute(() -> start(event.state(), false)); + } else { + logger.warn("Not starting watcher, the indices have not been upgraded yet. Please run the Upgrade API"); + } } } } diff --git a/plugin/src/main/resources/monitoring-alerts.json b/plugin/src/main/resources/monitoring-alerts.json index 8d05d89d079..5f6f2a4cb67 100644 --- a/plugin/src/main/resources/monitoring-alerts.json +++ b/plugin/src/main/resources/monitoring-alerts.json @@ -5,6 +5,7 @@ "index": { "number_of_shards": 1, "number_of_replicas": 1, + "format": 6, "codec": "best_compression" } }, diff --git a/plugin/src/main/resources/monitoring-beats.json b/plugin/src/main/resources/monitoring-beats.json index a8c9410e8a1..7283e4e45cd 100644 --- a/plugin/src/main/resources/monitoring-beats.json +++ b/plugin/src/main/resources/monitoring-beats.json @@ -4,6 +4,7 @@ "settings": { "index.number_of_shards": 1, "index.number_of_replicas": 1, + "index.format": 6, "index.codec": "best_compression" }, "mappings": { diff --git a/plugin/src/main/resources/monitoring-es.json b/plugin/src/main/resources/monitoring-es.json index d91ba561a9c..dddb89c268b 100644 --- a/plugin/src/main/resources/monitoring-es.json +++ b/plugin/src/main/resources/monitoring-es.json @@ -4,6 +4,7 @@ "settings": { "index.number_of_shards": 1, "index.number_of_replicas": 1, + "index.format": 6, "index.codec": "best_compression" }, "mappings": { diff --git a/plugin/src/main/resources/monitoring-kibana.json b/plugin/src/main/resources/monitoring-kibana.json index 72e64e15ef9..66cc418b7eb 100644 --- a/plugin/src/main/resources/monitoring-kibana.json +++ b/plugin/src/main/resources/monitoring-kibana.json @@ -4,6 +4,7 @@ "settings": { "index.number_of_shards": 1, "index.number_of_replicas": 1, + "index.format": 6, "index.codec": "best_compression" }, "mappings": { diff --git a/plugin/src/main/resources/monitoring-logstash.json b/plugin/src/main/resources/monitoring-logstash.json index c263c935503..095de0d41ea 100644 --- a/plugin/src/main/resources/monitoring-logstash.json +++ b/plugin/src/main/resources/monitoring-logstash.json @@ -4,6 +4,7 @@ "settings": { "index.number_of_shards": 1, "index.number_of_replicas": 1, + "index.format": 6, "index.codec": "best_compression" }, "mappings": { diff --git a/plugin/src/main/resources/security-index-template.json b/plugin/src/main/resources/security-index-template.json index 6acac33b688..12be24a1199 100644 --- a/plugin/src/main/resources/security-index-template.json +++ b/plugin/src/main/resources/security-index-template.json @@ -7,6 +7,7 @@ "number_of_replicas" : 0, "auto_expand_replicas" : "0-all", "index.priority": 1000, + "index.format": 6, "analysis" : { "filter" : { "email" : { diff --git a/plugin/src/main/resources/security_audit_log.json b/plugin/src/main/resources/security_audit_log.json index 2614e4e32e2..b91b5194c4b 100644 --- a/plugin/src/main/resources/security_audit_log.json +++ b/plugin/src/main/resources/security_audit_log.json @@ -2,6 +2,7 @@ "index_patterns": ".security_audit_log*", "order": 2147483647, "settings": { + "index.format": 6, "index.mapper.dynamic" : false }, "mappings": { diff --git a/plugin/src/main/resources/triggered-watches.json b/plugin/src/main/resources/triggered-watches.json index c8b5db73060..bd15d9fb902 100644 --- a/plugin/src/main/resources/triggered-watches.json +++ b/plugin/src/main/resources/triggered-watches.json @@ -5,6 +5,7 @@ "index.number_of_shards": 1, "index.mapper.dynamic" : false, "index.refresh_interval" : "-1", + "index.format": 6, "index.priority": 900 }, "mappings": { diff --git a/plugin/src/main/resources/watch-history.json b/plugin/src/main/resources/watch-history.json index 57fbee0c463..dfad3960b78 100644 --- a/plugin/src/main/resources/watch-history.json +++ b/plugin/src/main/resources/watch-history.json @@ -4,6 +4,7 @@ "settings": { "xpack.watcher.template.version": "${xpack.watcher.template.version}", "index.number_of_shards": 1, + "index.format": 6, "index.mapper.dynamic": false }, "mappings": { diff --git a/plugin/src/main/resources/watches.json b/plugin/src/main/resources/watches.json index fe26b1a1988..b422955fb6e 100644 --- a/plugin/src/main/resources/watches.json +++ b/plugin/src/main/resources/watches.json @@ -4,6 +4,7 @@ "settings": { "index.number_of_shards": 1, "index.mapper.dynamic" : false, + "index.format": 6, "index.priority": 800 }, "mappings": { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java index 809731071d8..7cf8a16b6a6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/watcher/WatcherLifeCycleServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; import org.elasticsearch.xpack.watcher.watch.Watch; import org.junit.Before; import org.mockito.stubbing.Answer; @@ -352,6 +353,38 @@ public class WatcherLifeCycleServiceTests extends ESTestCase { verify(watcherService, times(1)).pauseExecution(anyObject()); } + public void testWatcherDoesNotStartWithOldIndexFormat() throws Exception { + String index = randomFrom(Watch.INDEX, TriggeredWatchStore.INDEX_NAME); + Index watchIndex = new Index(index, "foo"); + ShardId shardId = new ShardId(watchIndex, 0); + IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(watchIndex) + .addShard(TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED)).build(); + DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")).build(); + + Settings.Builder indexSettings = Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); + // no matter if not set or set to one, watcher should not start + if (randomBoolean()) { + indexSettings.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 1); + } + IndexMetaData.Builder newIndexMetaDataBuilder = IndexMetaData.builder(index).settings(indexSettings); + + ClusterState clusterStateWithWatcherIndex = ClusterState.builder(new ClusterName("my-cluster")) + .nodes(nodes) + .routingTable(RoutingTable.builder().add(watchRoutingTable).build()) + .metaData(MetaData.builder().put(newIndexMetaDataBuilder)) + .build(); + + ClusterState emptyClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build(); + + when(watcherService.state()).thenReturn(WatcherState.STOPPED); + when(watcherService.validate(eq(clusterStateWithWatcherIndex))).thenReturn(true); + lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, emptyClusterState)); + verify(watcherService, never()).start(any(ClusterState.class)); + } + private static DiscoveryNode newNode(String nodeName) { return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);