diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java index b964d0b74b2..4704b396345 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStore.java @@ -25,11 +25,13 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; +import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils; import java.io.IOException; import java.util.ArrayList; @@ -77,15 +79,13 @@ public class TriggeredWatchStore extends AbstractComponent { } public boolean validate(ClusterState state) { - IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); - if (indexMetaData != null) { - if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { - logger.debug("not all primary shards of the [{}] index are started, so we cannot load previous triggered watches", - INDEX_NAME); - return false; - } - } else { - logger.debug("triggered watch index doesn't exist, so we can load"); + try { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData()); + return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive(); + } catch (IndexNotFoundException e) { + } catch (IllegalStateException e) { + logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX_NAME), e); + return false; } return true; } @@ -235,15 +235,16 @@ public class TriggeredWatchStore extends AbstractComponent { } public Collection loadTriggeredWatches(ClusterState state) { - IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); - if (indexMetaData == null) { - logger.debug("no .triggered_watches indices found. skipping loading awaiting triggered watches"); + IndexMetaData indexMetaData; + try { + indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData()); + } catch (IndexNotFoundException e) { return Collections.emptySet(); } int numPrimaryShards; - if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { - throw illegalState("not all primary shards of the [{}] index are started.", INDEX_NAME); + if (state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive() == false) { + throw illegalState("not all primary shards of the triggered watches index {} are started", indexMetaData.getIndex()); } else { numPrimaryShards = indexMetaData.getNumberOfShards(); } @@ -267,7 +268,7 @@ public class TriggeredWatchStore extends AbstractComponent { String id = sh.getId(); try { TriggeredWatch triggeredWatch = triggeredWatchParser.parse(id, sh.version(), sh.getSourceRef()); - logger.debug("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id()); + logger.trace("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id()); triggeredWatches.add(triggeredWatch); } catch (Exception e) { logger.error( @@ -279,6 +280,7 @@ public class TriggeredWatchStore extends AbstractComponent { } finally { client.clearScroll(response.getScrollId()); } + logger.debug("loaded [{}] triggered watches", triggeredWatches.size()); return triggeredWatches; } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java index fa7009b361c..4cbe7b8ae4e 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStore.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -86,35 +87,32 @@ public class WatchStore extends AbstractComponent { return; } - IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData != null) { - try { - int count = loadWatches(watchesIndexMetaData.getNumberOfShards()); - logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); - started.set(true); - } catch (Exception e) { - logger.debug((Supplier) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e); - watches.clear(); - throw e; - } - } else { - started.set(true); + try { + IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData()); + int count = loadWatches(indexMetaData.getNumberOfShards()); + logger.debug("loaded [{}] watches from the watches index [{}]", count, indexMetaData.getIndex().getName()); + } catch (IndexNotFoundException e) { + } catch (Exception e) { + logger.debug((Supplier) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e); + watches.clear(); + throw e; } + + started.set(true); } public boolean validate(ClusterState state) { - IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); - if (watchesIndexMetaData == null) { - logger.debug("index [{}] doesn't exist, so we can start", INDEX); + IndexMetaData watchesIndexMetaData; + try { + watchesIndexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData()); + } catch (IndexNotFoundException e) { return true; - } - if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { - logger.debug("index [{}] exists and all primary shards are started, so we can start", INDEX); - return true; - } else { - logger.debug("not all primary shards active for index [{}], so we cannot start", INDEX); + } catch (IllegalStateException e) { + logger.trace((Supplier) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX), e); return false; } + + return state.routingTable().index(watchesIndexMetaData.getIndex().getName()).allPrimaryShardsActive(); } public boolean started() { @@ -374,5 +372,4 @@ public class WatchStore extends AbstractComponent { return response; } } - } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java new file mode 100644 index 00000000000..91230af2ec2 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/watcher/watch/WatchStoreUtils.java @@ -0,0 +1,37 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.watcher.watch; + +import org.elasticsearch.cluster.metadata.AliasOrIndex; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.index.IndexNotFoundException; + +public class WatchStoreUtils { + + /** + * Method to get indexmetadata of a index, that potentially is behind an alias. + * + * @param name Name of the index or the alias + * @param metaData Metadata to search for the name + * @return IndexMetaData of the concrete index + * @throws IllegalStateException If an alias points to two indices + * @throws IndexNotFoundException If no index exists + */ + public static IndexMetaData getConcreteIndex(String name, MetaData metaData) { + AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(name); + if (aliasOrIndex == null) { + throw new IndexNotFoundException(name); + } + + if (aliasOrIndex.isAlias() && aliasOrIndex.getIndices().size() > 1) { + throw new IllegalStateException("Alias [" + name + "] points to more than one index"); + } + + return aliasOrIndex.getIndices().get(0); + } + +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java index af2607849ce..36a938f37b8 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/execution/TriggeredWatchStoreTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.execution; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -53,6 +55,12 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; public class TriggeredWatchStoreTests extends ESTestCase { + + private Settings indexSettings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + private WatcherClientProxy clientProxy; private TriggeredWatch.Parser parser; private TriggeredWatchStore triggeredWatchStore; @@ -67,8 +75,8 @@ public class TriggeredWatchStoreTests extends ESTestCase { public void testLoadWatchRecordsNoPriorHistoryIndices() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - csBuilder.metaData(metaDateBuilder); + MetaData.Builder metaDataBuilder = MetaData.builder(); + csBuilder.metaData(metaDataBuilder); ClusterState cs = csBuilder.build(); assertThat(triggeredWatchStore.validate(cs), is(true)); @@ -82,7 +90,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); int numShards = 2 + randomInt(2); int numStartedShards = 1; @@ -90,9 +98,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings) + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings) .numberOfShards(numShards).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); for (int i = 0; i < numShards; i++) { ShardRoutingState state; @@ -110,17 +118,13 @@ public class TriggeredWatchStoreTests extends ESTestCase { } routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); assertThat(triggeredWatchStore.validate(cs), is(false)); - try { - triggeredWatchStore.loadTriggeredWatches(cs); - fail("exception expected, because not all primary shards are started"); - } catch (Exception e) { - assertThat(e.getMessage(), equalTo("not all primary shards of the [.triggered_watches] index are started.")); - } + IllegalStateException e = expectThrows(IllegalStateException.class, () -> triggeredWatchStore.loadTriggeredWatches(cs)); + assertThat(e.getMessage(), is("not all primary shards of the triggered watches index [.triggered_watches] are started")); verifyZeroInteractions(clientProxy); } @@ -129,13 +133,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -143,7 +143,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -164,13 +164,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -178,7 +174,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -208,13 +204,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -222,7 +214,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -251,13 +243,9 @@ public class TriggeredWatchStoreTests extends ESTestCase { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - MetaData.Builder metaDateBuilder = MetaData.builder(); - Settings settings = settings(Version.CURRENT) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) - .build(); - metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings)); + final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); ShardId shardId = new ShardId(index, 0); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) @@ -265,7 +253,7 @@ public class TriggeredWatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -312,6 +300,63 @@ public class TriggeredWatchStoreTests extends ESTestCase { verify(clientProxy, times(1)).clearScroll(anyString()); } + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports a single alias pointing to the watch index + public void testLoadStoreAsAlias() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); + metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + ShardId shardId = new ShardId(index, 0); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) + .addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + assertThat(triggeredWatchStore.validate(cs), is(true)); + verifyZeroInteractions(clientProxy); + } + + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports only a single index in an alias + public void testLoadingFailsWithTwoAliases() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + metaDataBuilder.put(IndexMetaData.builder("whatever").settings(indexSettings) + .putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build())); + + final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("triggered-watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + final Index otherIndex = metaDataBuilder.get("whatever").getIndex(); + IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex); + otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + ClusterState cs = csBuilder.build(); + + assertThat(triggeredWatchStore.validate(cs), is(false)); + verifyZeroInteractions(clientProxy); + } + private RefreshResponse mockRefreshResponse(int total, int successful) { RefreshResponse refreshResponse = mock(RefreshResponse.class); when(refreshResponse.getTotalShards()).thenReturn(total); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 3c9d54ec43f..3ce44f06a32 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -6,6 +6,8 @@ package org.elasticsearch.xpack.watcher.test; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -15,12 +17,15 @@ import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.env.Environment; import org.elasticsearch.index.query.QueryBuilder; @@ -51,12 +56,14 @@ import org.elasticsearch.xpack.security.authc.support.SecuredString; import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.ClockMock; +import org.elasticsearch.xpack.template.TemplateUtils; import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.WatcherState; import org.elasticsearch.xpack.watcher.client.WatcherClient; import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.ExecutionState; +import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; @@ -64,6 +71,7 @@ import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock; import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.xpack.watcher.watch.Watch; +import org.elasticsearch.xpack.watcher.watch.WatchStore; import org.hamcrest.Matcher; import org.jboss.netty.util.internal.SystemPropertyUtil; import org.junit.After; @@ -90,6 +98,7 @@ import java.util.function.Function; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME; @@ -245,6 +254,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase public void _setup() throws Exception { setupTimeWarp(); startWatcherIfNodesExist(); + configureAliasesForWatcherIndices(); } @After @@ -304,6 +314,52 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase } } + /** + * In order to test, that .watches and .triggered-watches indices can also point to an alias, we will rarely create those + * after starting watcher + * + * The idea behind this is the possible use of the migration helper for upgrades, see + * https://github.com/elastic/elasticsearch-migration/ + * + */ + private void configureAliasesForWatcherIndices() throws Exception { + // alias for .watches, setting the index template to the same as well + if (rarely()) { + String newIndex = ".watches-alias-index"; + BytesReference bytesReference = TemplateUtils.load("/watches.json"); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) { + Map parserMap = parser.map(); + Map allMappings = (Map) parserMap.get("mappings"); + + CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex) + .setCause("Index to test aliases with .watches index") + .addAlias(new Alias(WatchStore.INDEX)) + .setSettings((Map) parserMap.get("settings")) + .addMapping("watch", (Map) allMappings.get("watch")) + .get(); + assertAcked(response); + } + } + + // alias for .triggered-watches, ensuring the index template is set appropriately + if (rarely()) { + String newIndex = ".triggered-watches-alias-index"; + BytesReference bytesReference = TemplateUtils.load("/triggered_watches.json"); + try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) { + Map parserMap = parser.map(); + Map allMappings = (Map) parserMap.get("mappings"); + + CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex) + .setCause("Index to test aliases with .triggered-watches index") + .addAlias(new Alias(TriggeredWatchStore.INDEX_NAME)) + .setSettings((Map) parserMap.get("settings")) + .addMapping("triggered_watch", (Map) allMappings.get("triggered_watch")) + .get(); + assertAcked(response); + } + } + } + protected TimeWarp timeWarp() { assert timeWarped() : "cannot access TimeWarp when test context is not time warped"; return timeWarp; diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java index c308a33c301..6367546bf02 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/test/integration/BootStrapTests.java @@ -47,8 +47,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsEqual.equalTo; import static org.joda.time.DateTimeZone.UTC; -@TestLogging("org.elasticsearch.watcher:TRACE") public class BootStrapTests extends AbstractWatcherIntegrationTestCase { + @Override protected boolean timeWarped() { // timewarping isn't necessary here, because we aren't testing triggering or throttling @@ -188,10 +188,11 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase { DateTime now = DateTime.now(UTC); Wid wid = new Wid("_id", 1, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); + client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value()) .setSource(jsonBuilder().startObject() .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) - .field(event.type(), event) + .field(event.type(), event) .endObject() .endObject()) .setWaitForActiveShards(ActiveShardCount.ALL) diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java index 948e0a486f1..8c80ab10ff2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/watcher/watch/WatchStoreTests.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.watch; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -80,8 +82,8 @@ public class WatchStoreTests extends ESTestCase { public void testStartNoPreviousWatchesIndex() throws Exception { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); - csBuilder.metaData(metaDateBuilder); + MetaData.Builder metaDataBuilder = MetaData.builder(); + csBuilder.metaData(metaDataBuilder); ClusterState cs = csBuilder.build(); assertThat(watchStore.validate(cs), is(true)); @@ -96,14 +98,14 @@ public class WatchStoreTests extends ESTestCase { public void testStartPrimaryShardNotReady() { ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); - MetaData.Builder metaDateBuilder = MetaData.builder(); + MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); Settings settings = settings(Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, @@ -111,7 +113,7 @@ public class WatchStoreTests extends ESTestCase { .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - csBuilder.metaData(metaDateBuilder); + csBuilder.metaData(metaDataBuilder); csBuilder.routingTable(routingTableBuilder.build()); ClusterState cs = csBuilder.build(); @@ -378,26 +380,134 @@ public class WatchStoreTests extends ESTestCase { assertThat(watchStore.activeWatches(), hasSize(0)); } - /* - * Creates the standard cluster state metadata for the watches index - * with shards/replicas being marked as started - */ - private void createWatchIndexMetaData(ClusterState.Builder builder) { - MetaData.Builder metaDateBuilder = MetaData.builder(); + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports a single alias pointing to the watch index + public void testThatStartingWithWatchesIndexAsAliasWorks() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); Settings settings = settings(Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .build(); - metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); - final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); + metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + + final Index index = metaDataBuilder.get("watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(indexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + + RefreshResponse refreshResponse = mockRefreshResponse(1, 1); + when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse); + + BytesReference source = new BytesArray("{}"); + InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new Text("type"), Collections.emptyMap()); + hit1.sourceRef(source); + InternalSearchHit hit2 = new InternalSearchHit(1, "_id2", new Text("type"), Collections.emptyMap()); + hit2.sourceRef(source); + SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2, hit1, hit2); + + when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1); + + InternalSearchHit hit3 = new InternalSearchHit(2, "_id3", new Text("type"), Collections.emptyMap()); + hit3.sourceRef(source); + InternalSearchHit hit4 = new InternalSearchHit(3, "_id4", new Text("type"), Collections.emptyMap()); + hit4.sourceRef(source); + SearchResponse searchResponse2 = mockSearchResponse(1, 1, 2, hit3, hit4); + SearchResponse searchResponse3 = mockSearchResponse(1, 1, 2); + when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3); + + Watch watch1 = mock(Watch.class); + WatchStatus status = mock(WatchStatus.class); + when(watch1.status()).thenReturn(status); + Watch watch2 = mock(Watch.class); + when(watch2.status()).thenReturn(status); + Watch watch3 = mock(Watch.class); + when(watch3.status()).thenReturn(status); + Watch watch4 = mock(Watch.class); + when(watch4.status()).thenReturn(status); + when(parser.parse("_id1", true, source, true)).thenReturn(watch1); + when(parser.parse("_id2", true, source, true)).thenReturn(watch2); + when(parser.parse("_id3", true, source, true)).thenReturn(watch3); + when(parser.parse("_id4", true, source, true)).thenReturn(watch4); + + when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0)); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(true)); + watchStore.start(cs); + assertThat(watchStore.started(), is(true)); + assertThat(watchStore.watches().size(), equalTo(4)); + verify(clientProxy, times(1)).refresh(any(RefreshRequest.class)); + verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class)); + verify(clientProxy, times(1)).clearScroll(anyString()); + } + + // the elasticsearch migration helper is doing reindex using aliases, so we have to + // make sure that the watch store supports only a single index in an alias + public void testThatWatchesIndexWithTwoAliasesFails() throws Exception { + ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); + + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + metaDataBuilder.put(IndexMetaData.builder("whatever").settings(settings).numberOfShards(1).numberOfReplicas(1) + .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build())); + + final Index index = metaDataBuilder.get("watches-alias").getIndex(); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + indexRoutingTableBuilder.addReplica(); + final Index otherIndex = metaDataBuilder.get("whatever").getIndex(); + IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex); + otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) + .addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED)) + .build()); + otherIndexRoutingTableBuilder.addReplica(); + routingTableBuilder.add(otherIndexRoutingTableBuilder.build()); + csBuilder.metaData(metaDataBuilder); + csBuilder.routingTable(routingTableBuilder.build()); + + ClusterState cs = csBuilder.build(); + assertThat(watchStore.validate(cs), is(false)); + IllegalStateException exception = expectThrows(IllegalStateException.class, () -> watchStore.start(cs)); + assertThat(exception.getMessage(), is("Alias [.watches] points to more than one index")); + } + + /* + * Creates the standard cluster state metadata for the watches index + * with shards/replicas being marked as started + */ + private void createWatchIndexMetaData(ClusterState.Builder builder) { + MetaData.Builder metaDataBuilder = MetaData.builder(); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + Settings settings = settings(Version.CURRENT) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); + final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex(); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) .build()); indexRoutingTableBuilder.addReplica(); routingTableBuilder.add(indexRoutingTableBuilder.build()); - builder.metaData(metaDateBuilder); + builder.metaData(metaDataBuilder); builder.routingTable(routingTableBuilder.build()); }