Watcher: Do not start, if index meta data version is wrong (elastic/x-pack-elasticsearch#1770)

If the internal index version of an index is not the right one, do
not start watcher.

Also, add the internal index version of 6 to all our index templates.

Original commit: elastic/x-pack-elasticsearch@20b50aa82b
This commit is contained in:
Alexander Reelsen 2017-06-21 13:51:36 +02:00 committed by GitHub
parent c661ee0934
commit 889ee11c83
12 changed files with 65 additions and 3 deletions

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
@ -33,6 +34,10 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener { public class WatcherLifeCycleService extends AbstractComponent implements ClusterStateListener {
// this is the required index.format setting for watcher to start up at all
// this index setting is set by the upgrade API or automatically when a 6.0 index template is created
private static final int EXPECTED_INDEX_FORMAT_VERSION = 6;
private final WatcherService watcherService; private final WatcherService watcherService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final ExecutorService executor; private final ExecutorService executor;
@ -92,8 +97,9 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
} }
} }
/* /**
* TODO possible optimization * @param event The event of containing the new cluster state
*
* stop certain parts of watcher, when there are no watcher indices on this node by checking the shardrouting * stop certain parts of watcher, when there are no watcher indices on this node by checking the shardrouting
* note that this is not easily possible, because of the execute watch api, that needs to be able to execute anywhere! * note that this is not easily possible, because of the execute watch api, that needs to be able to execute anywhere!
* this means, only certain components can be stopped * this means, only certain components can be stopped
@ -121,6 +127,7 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
DiscoveryNode localNode = event.state().nodes().getLocalNode(); DiscoveryNode localNode = event.state().nodes().getLocalNode();
RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId()); RoutingNode routingNode = event.state().getRoutingNodes().node(localNode.getId());
IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData()); IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
// no watcher index, time to pause, if we currently have shards here // no watcher index, time to pause, if we currently have shards here
if (watcherIndexMetaData == null) { if (watcherIndexMetaData == null) {
if (previousAllocationIds.get().isEmpty() == false) { if (previousAllocationIds.get().isEmpty() == false) {
@ -151,7 +158,19 @@ public class WatcherLifeCycleService extends AbstractComponent implements Cluste
executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids")); executor.execute(() -> watcherService.reload(event.state(), "different shard allocation ids"));
} }
} else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) { } else if (watcherService.state() != WatcherState.STARTED && watcherService.state() != WatcherState.STARTING) {
executor.execute(() -> start(event.state(), false)); IndexMetaData watcherIndexMetaData = WatchStoreUtils.getConcreteIndex(Watch.INDEX, event.state().metaData());
IndexMetaData triggeredWatchesIndexMetaData = WatchStoreUtils.getConcreteIndex(TriggeredWatchStore.INDEX_NAME,
event.state().metaData());
String indexFormatSetting = IndexMetaData.INDEX_FORMAT_SETTING.getKey();
boolean isIndexInternalFormatWatchIndex = watcherIndexMetaData == null ||
watcherIndexMetaData.getSettings().getAsInt(indexFormatSetting, 0) == EXPECTED_INDEX_FORMAT_VERSION;
boolean isIndexInternalFormatTriggeredWatchIndex = triggeredWatchesIndexMetaData == null ||
triggeredWatchesIndexMetaData.getSettings().getAsInt(indexFormatSetting, 0) == EXPECTED_INDEX_FORMAT_VERSION;
if (isIndexInternalFormatTriggeredWatchIndex && isIndexInternalFormatWatchIndex) {
executor.execute(() -> start(event.state(), false));
} else {
logger.warn("Not starting watcher, the indices have not been upgraded yet. Please run the Upgrade API");
}
} }
} }
} }

View File

@ -5,6 +5,7 @@
"index": { "index": {
"number_of_shards": 1, "number_of_shards": 1,
"number_of_replicas": 1, "number_of_replicas": 1,
"format": 6,
"codec": "best_compression" "codec": "best_compression"
} }
}, },

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.format": 6,
"index.codec": "best_compression" "index.codec": "best_compression"
}, },
"mappings": { "mappings": {

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.format": 6,
"index.codec": "best_compression" "index.codec": "best_compression"
}, },
"mappings": { "mappings": {

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.format": 6,
"index.codec": "best_compression" "index.codec": "best_compression"
}, },
"mappings": { "mappings": {

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.number_of_replicas": 1, "index.number_of_replicas": 1,
"index.format": 6,
"index.codec": "best_compression" "index.codec": "best_compression"
}, },
"mappings": { "mappings": {

View File

@ -7,6 +7,7 @@
"number_of_replicas" : 0, "number_of_replicas" : 0,
"auto_expand_replicas" : "0-all", "auto_expand_replicas" : "0-all",
"index.priority": 1000, "index.priority": 1000,
"index.format": 6,
"analysis" : { "analysis" : {
"filter" : { "filter" : {
"email" : { "email" : {

View File

@ -2,6 +2,7 @@
"index_patterns": ".security_audit_log*", "index_patterns": ".security_audit_log*",
"order": 2147483647, "order": 2147483647,
"settings": { "settings": {
"index.format": 6,
"index.mapper.dynamic" : false "index.mapper.dynamic" : false
}, },
"mappings": { "mappings": {

View File

@ -5,6 +5,7 @@
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.mapper.dynamic" : false, "index.mapper.dynamic" : false,
"index.refresh_interval" : "-1", "index.refresh_interval" : "-1",
"index.format": 6,
"index.priority": 900 "index.priority": 900
}, },
"mappings": { "mappings": {

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"xpack.watcher.template.version": "${xpack.watcher.template.version}", "xpack.watcher.template.version": "${xpack.watcher.template.version}",
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.format": 6,
"index.mapper.dynamic": false "index.mapper.dynamic": false
}, },
"mappings": { "mappings": {

View File

@ -4,6 +4,7 @@
"settings": { "settings": {
"index.number_of_shards": 1, "index.number_of_shards": 1,
"index.mapper.dynamic" : false, "index.mapper.dynamic" : false,
"index.format": 6,
"index.priority": 800 "index.priority": 800
}, },
"mappings": { "mappings": {

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.junit.Before; import org.junit.Before;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -352,6 +353,38 @@ public class WatcherLifeCycleServiceTests extends ESTestCase {
verify(watcherService, times(1)).pauseExecution(anyObject()); verify(watcherService, times(1)).pauseExecution(anyObject());
} }
public void testWatcherDoesNotStartWithOldIndexFormat() throws Exception {
String index = randomFrom(Watch.INDEX, TriggeredWatchStore.INDEX_NAME);
Index watchIndex = new Index(index, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
IndexRoutingTable watchRoutingTable = IndexRoutingTable.builder(watchIndex)
.addShard(TestShardRouting.newShardRouting(shardId, "node_1", true, STARTED)).build();
DiscoveryNodes nodes = new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")).build();
Settings.Builder indexSettings = Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT);
// no matter if not set or set to one, watcher should not start
if (randomBoolean()) {
indexSettings.put(IndexMetaData.INDEX_FORMAT_SETTING.getKey(), 1);
}
IndexMetaData.Builder newIndexMetaDataBuilder = IndexMetaData.builder(index).settings(indexSettings);
ClusterState clusterStateWithWatcherIndex = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(nodes)
.routingTable(RoutingTable.builder().add(watchRoutingTable).build())
.metaData(MetaData.builder().put(newIndexMetaDataBuilder))
.build();
ClusterState emptyClusterState = ClusterState.builder(new ClusterName("my-cluster")).nodes(nodes).build();
when(watcherService.state()).thenReturn(WatcherState.STOPPED);
when(watcherService.validate(eq(clusterStateWithWatcherIndex))).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterStateWithWatcherIndex, emptyClusterState));
verify(watcherService, never()).start(any(ClusterState.class));
}
private static DiscoveryNode newNode(String nodeName) { private static DiscoveryNode newNode(String nodeName) {
return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(), return new DiscoveryNode(nodeName, ESTestCase.buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT); new HashSet<>(asList(DiscoveryNode.Role.values())), Version.CURRENT);