From 0228a94d805f58ea306db1a5fcb12920180009a9 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 19 Oct 2016 10:29:27 +0200 Subject: [PATCH] Watcher: Add support for aliases for watches/triggered watches index (elastic/elasticsearch#3770) As discussed in #elastic/elasticsearch-migration/79 supporting aliases for watcher allows the migration plugin to work. This adds the relevent checks in the WatchStore and the TriggeredWatchStore that aliases are supported, as the current assumption was always to just load an index. Also, this rarely sets those indices as aliases in all the integration tests, so that this case gets tested. Note: The new WatchStoreUtils.getConcreteIndex() method will be put into core, as this is a useful helper for others. Original commit: elastic/x-pack-elasticsearch@4a98af691dd9798d7923183cb6b27b6b2ee3772f --- .../execution/TriggeredWatchStore.java | 32 ++-- .../xpack/watcher/watch/WatchStore.java | 43 +++--- .../xpack/watcher/watch/WatchStoreUtils.java | 37 +++++ .../execution/TriggeredWatchStoreTests.java | 133 +++++++++++------ .../AbstractWatcherIntegrationTestCase.java | 56 +++++++ .../test/integration/BootStrapTests.java | 5 +- .../xpack/watcher/watch/WatchStoreTests.java | 140 ++++++++++++++++-- 7 files changed, 347 insertions(+), 99 deletions(-) create mode 100644 elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index b964d0b74b2..4704b396345 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -25,11 +25,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; +import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.io.IOException; import java.util.ArrayList; @@ -77,15 +79,13 @@ public class TriggeredWatchStore extends AbstractComponent { } public boolean validate(ClusterState state) { - IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); - if (indexMetaData != null) { - if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { - logger.debug("not all primary shards of the [{}] index are started, so we cannot load previous triggered watches", - INDEX_NAME); - return false; - } - } else { - logger.debug("triggered watch index doesn't exist, so we can load"); + try { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData()); + return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); + } catch (IndexNotFoundException e) { + } catch (IllegalStateException e) { + logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX_NAME), e); + return false; } return true; } @@ -235,15 +235,16 @@ public class TriggeredWatchStore extends AbstractComponent { } public Collection loadTriggeredWatches(ClusterState state) { - IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); - if (indexMetaData == null) { - logger.debug("no .triggered_watches indices found. skipping loading awaiting triggered watches"); + IndexMetaData indexMetaData; + try { + indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData()); + } catch (IndexNotFoundException e) { return Collections.emptySet(); } int numPrimaryShards; - if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { - throw illegalState("not all primary shards of the [{}] index are started.", INDEX_NAME); + if (state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive() == false) { + throw illegalState("not all primary shards of the triggered watches index {} are started", indexMetaData.getIndex()); } else { numPrimaryShards = indexMetaData.getNumberOfShards(); } @@ -267,7 +268,7 @@ public class TriggeredWatchStore extends AbstractComponent { String id = sh.getId(); try { TriggeredWatch triggeredWatch = triggeredWatchParser.parse(id, sh.version(), sh.getSourceRef()); - logger.debug("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id()); + logger.trace("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id()); triggeredWatches.add(triggeredWatch); } catch (Exception e) { logger.error( @@ -279,6 +280,7 @@ public class TriggeredWatchStore extends AbstractComponent { } finally { client.clearScroll(response.getScrollId()); } + logger.debug("loaded [{}] triggered watches", triggeredWatches.size()); return triggeredWatches; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index fa7009b361c..4cbe7b8ae4e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -86,35 +87,32 @@ public class WatchStore extends AbstractComponent { return; } - IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData != null) { - try { - int count = loadWatches(watchesIndexMetaData.getNumberOfShards()); - logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); - started.set(true); - } catch (Exception e) { - logger.debug((Supplier) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e); - watches.clear(); - throw e; - } - } else { - started.set(true); + try { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData()); + int count = loadWatches(indexMetaData.getNumberOfShards()); + logger.debug("loaded [{}] watches from the watches index [{}]", count, indexMetaData.getIndex().getName()); + } catch (IndexNotFoundException e) { + } catch (Exception e) { + logger.debug((Supplier) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e); + watches.clear(); + throw e; } + + started.set(true); } public boolean validate(ClusterState state) { - IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData == null) { - logger.debug("index [{}] doesn't exist, so we can start", INDEX); + IndexMetaData watchesIndexMetaData; + try { + watchesIndexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData()); + } catch (IndexNotFoundException e) { return true; - } - if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { - logger.debug("index [{}] exists and all primary shards are started, so we can start", INDEX); - return true; - } else { - logger.debug("not all primary shards active for index [{}], so we cannot start", INDEX); + } catch (IllegalStateException e) { + logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX), e); return false; } + + return state.routingTable().index(watchesIndexMetaData.getIndex().getName()).allPrimaryShardsActive(); } public boolean started() { @@ -374,5 +372,4 @@ public class WatchStore extends AbstractComponent { return response; } } - } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java new file mode 100644 index 00000000000..91230af2ec2 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.watch; + +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.index.IndexNotFoundException; + +public class WatchStoreUtils { + + /** + * Method to get indexmetadata of a index, that potentially is behind an alias. + * + * @param name Name of the index or the alias + * @param metaData Metadata to search for the name + * @return IndexMetaData of the concrete index + * @throws IllegalStateException If an alias points to two indices + * @throws IndexNotFoundException If no index exists + */ + public static IndexMetaData getConcreteIndex(String name, MetaData metaData) { + AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(name); + if (aliasOrIndex == null) { + throw new IndexNotFoundException(name); + } + + if (aliasOrIndex.isAlias() && aliasOrIndex.getIndices().size() > 1) { + throw new IllegalStateException("Alias [" + name + "] points to more than one index"); + } + + return aliasOrIndex.getIndices().get(0); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index af2607849ce..36a938f37b8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -53,6 +55,12 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class TriggeredWatchStoreTests extends ESTestCase { + + private Settings indexSettings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + private WatcherClientProxy clientProxy; private TriggeredWatch.Parser parser; private TriggeredWatchStore triggeredWatchStore; @@ -67,8 +75,8 @@ public class TriggeredWatchStoreTests extends ESTestCase { public void testLoadWatchRecordsNoPriorHistoryIndices() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - csBuilder.metaData(metaDateBuilder); + MetaData.Builder metaDataBuilder = MetaData.builder(); + csBuilder.metaData(metaDataBuilder); ClusterState cs = csBuilder.build(); assertThat(triggeredWatchStore.validate(cs), is(true)); @@ -82,7 +90,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); int numShards = 2 + randomInt(2); int numStartedShards = 1; @@ -90,9 +98,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings) + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings) .numberOfShards(numShards).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); for (int i = 0; i < numShards; i++) { ShardRoutingState state; @@ -110,17 +118,13 @@ public class TriggeredWatchStoreTests extends ESTestCase { } routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); assertThat(triggeredWatchStore.validate(cs), is(false)); - try { - triggeredWatchStore.loadTriggeredWatches(cs); - fail("exception expected, because not all primary shards are started"); - } catch (Exception e) { - assertThat(e.getMessage(), equalTo("not all primary shards of the [.triggered_watches] index are started.")); - } + IllegalStateException e = expectThrows(IllegalStateException.class, () -> triggeredWatchStore.loadTriggeredWatches(cs)); + assertThat(e.getMessage(), is("not all primary shards of the triggered watches index [.triggered_watches] are started")); verifyZeroInteractions(clientProxy); } @@ -129,13 +133,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -143,7 +143,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -164,13 +164,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -178,7 +174,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -208,13 +204,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -222,7 +214,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -251,13 +243,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -265,7 +253,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -312,6 +300,63 @@ public class TriggeredWatchStoreTests extends ESTestCase { verify(clientProxy, times(1)).clearScroll(anyString()); } + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports a single alias pointing to the watch index + public void testLoadStoreAsAlias() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + ShardId shardId = new ShardId(index, 0); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) + .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + assertThat(triggeredWatchStore.validate(cs), is(true)); + verifyZeroInteractions(clientProxy); + } + + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports only a single index in an alias + public void testLoadingFailsWithTwoAliases() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + metaDataBuilder.put(IndexMetaData.builder("whatever").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + + final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("triggered-watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + final Index otherIndex = metaDataBuilder.get("whatever").getIndex(); + IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex); + otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + assertThat(triggeredWatchStore.validate(cs), is(false)); + verifyZeroInteractions(clientProxy); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 3c9d54ec43f..3ce44f06a32 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.watcher.test; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -15,12 +17,15 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.env.Environment; import org.elasticsearch.index.query.QueryBuilder; @@ -51,12 +56,14 @@ import org.elasticsearch.xpack.security.authc.support.SecuredString; import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.ClockMock; +import org.elasticsearch.xpack.template.TemplateUtils; import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.WatcherState; import org.elasticsearch.xpack.watcher.client.WatcherClient; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.ExecutionState; +import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; @@ -64,6 +71,7 @@ import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.xpack.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.hamcrest.Matcher; import org.jboss.netty.util.internal.SystemPropertyUtil; import org.junit.After; @@ -90,6 +98,7 @@ import java.util.function.Function; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME; @@ -245,6 +254,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase public void _setup() throws Exception { setupTimeWarp(); startWatcherIfNodesExist(); + configureAliasesForWatcherIndices(); } @After @@ -304,6 +314,52 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase } } + /** + * In order to test, that .watches and .triggered-watches indices can also point to an alias, we will rarely create those + * after starting watcher + * + * The idea behind this is the possible use of the migration helper for upgrades, see + * https://github.com/elastic/elasticsearch-migration/ + * + */ + private void configureAliasesForWatcherIndices() throws Exception { + // alias for .watches, setting the index template to the same as well + if (rarely()) { + String newIndex = ".watches-alias-index"; + BytesReference bytesReference = TemplateUtils.load("/watches.json"); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) { + Map parserMap = parser.map(); + Map allMappings = (Map) parserMap.get("mappings"); + + CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex) + .setCause("Index to test aliases with .watches index") + .addAlias(new Alias(WatchStore.INDEX)) + .setSettings((Map) parserMap.get("settings")) + .addMapping("watch", (Map) allMappings.get("watch")) + .get(); + assertAcked(response); + } + } + + // alias for .triggered-watches, ensuring the index template is set appropriately + if (rarely()) { + String newIndex = ".triggered-watches-alias-index"; + BytesReference bytesReference = TemplateUtils.load("/triggered_watches.json"); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) { + Map parserMap = parser.map(); + Map allMappings = (Map) parserMap.get("mappings"); + + CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex) + .setCause("Index to test aliases with .triggered-watches index") + .addAlias(new Alias(TriggeredWatchStore.INDEX_NAME)) + .setSettings((Map) parserMap.get("settings")) + .addMapping("triggered_watch", (Map) allMappings.get("triggered_watch")) + .get(); + assertAcked(response); + } + } + } + protected TimeWarp timeWarp() { assert timeWarped() : "cannot access TimeWarp when test context is not time warped"; return timeWarp; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index c308a33c301..6367546bf02 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -47,8 +47,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; -@TestLogging("org.elasticsearch.watcher:TRACE") public class BootStrapTests extends AbstractWatcherIntegrationTestCase { + @Override protected boolean timeWarped() { // timewarping isn't necessary here, because we aren't testing triggering or throttling @@ -188,10 +188,11 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { DateTime now = DateTime.now(UTC); Wid wid = new Wid("_id", 1, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); + client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) - .field(event.type(), event) + .field(event.type(), event) .endObject() .endObject()) .setWaitForActiveShards(ActiveShardCount.ALL) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index 948e0a486f1..8c80ab10ff2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.watch; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -80,8 +82,8 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoPreviousWatchesIndex() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - csBuilder.metaData(metaDateBuilder); + MetaData.Builder metaDataBuilder = MetaData.builder(); + csBuilder.metaData(metaDataBuilder); ClusterState cs = csBuilder.build(); assertThat(watchStore.validate(cs), is(true)); @@ -96,14 +98,14 @@ public class WatchStoreTests extends ESTestCase { public void testStartPrimaryShardNotReady() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); Settings settings = settings(Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, @@ -111,7 +113,7 @@ public class WatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -378,26 +380,134 @@ public class WatchStoreTests extends ESTestCase { assertThat(watchStore.activeWatches(), hasSize(0)); } - /* - * Creates the standard cluster state metadata for the watches index - * with shards/replicas being marked as started - */ - private void createWatchIndexMetaData(ClusterState.Builder builder) { - MetaData.Builder metaDateBuilder = MetaData.builder(); + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports a single alias pointing to the watch index + public void testThatStartingWithWatchesIndexAsAliasWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); Settings settings = settings(Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + + final Index index = metaDataBuilder.get("watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit1.sourceRef(source); + InternalSearchHit hit2 = new InternalSearchHit(1, "_id2", new Text("type"), Collections.emptyMap()); + hit2.sourceRef(source); + SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2, hit1, hit2); + + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1); + + InternalSearchHit hit3 = new InternalSearchHit(2, "_id3", new Text("type"), Collections.emptyMap()); + hit3.sourceRef(source); + InternalSearchHit hit4 = new InternalSearchHit(3, "_id4", new Text("type"), Collections.emptyMap()); + hit4.sourceRef(source); + SearchResponse searchResponse2 = mockSearchResponse(1, 1, 2, hit3, hit4); + SearchResponse searchResponse3 = mockSearchResponse(1, 1, 2); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3); + + Watch watch1 = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch1.status()).thenReturn(status); + Watch watch2 = mock(Watch.class); + when(watch2.status()).thenReturn(status); + Watch watch3 = mock(Watch.class); + when(watch3.status()).thenReturn(status); + Watch watch4 = mock(Watch.class); + when(watch4.status()).thenReturn(status); + when(parser.parse("_id1", true, source, true)).thenReturn(watch1); + when(parser.parse("_id2", true, source, true)).thenReturn(watch2); + when(parser.parse("_id3", true, source, true)).thenReturn(watch3); + when(parser.parse("_id4", true, source, true)).thenReturn(watch4); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches().size(), equalTo(4)); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); + verify(clientProxy, times(1)).clearScroll(anyString()); + } + + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports only a single index in an alias + public void testThatWatchesIndexWithTwoAliasesFails() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + metaDataBuilder.put(IndexMetaData.builder("whatever").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + + final Index index = metaDataBuilder.get("watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + final Index otherIndex = metaDataBuilder.get("whatever").getIndex(); + IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex); + otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + otherIndexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(otherIndexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(false)); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> watchStore.start(cs)); + assertThat(exception.getMessage(), is("Alias [.watches] points to more than one index")); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - builder.metaData(metaDateBuilder); + builder.metaData(metaDataBuilder); builder.routingTable(routingTableBuilder.build()); }