Watcher: Add support for aliases for watches/triggered watches index (elastic/elasticsearch#3770)

As discussed in #elastic/elasticsearch-migration/79 supporting aliases for watcher allows
the migration plugin to work.

This adds the relevent checks in the WatchStore and the TriggeredWatchStore that aliases are
supported, as the current assumption was always to just load an index.

Also, this rarely sets those indices as aliases in all the integration tests, so that this
case gets tested.

Note: The new WatchStoreUtils.getConcreteIndex() method will be put into core, as this is a
useful helper for others.

Original commit: elastic/x-pack-elasticsearch@4a98af691d
This commit is contained in:
Alexander Reelsen 2016-10-19 10:29:27 +02:00 committed by GitHub
parent aa1eedc062
commit 0228a94d80
7 changed files with 347 additions and 99 deletions

View File

@ -25,11 +25,13 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders; import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy; import org.elasticsearch.xpack.watcher.support.init.proxy.WatcherClientProxy;
import org.elasticsearch.xpack.watcher.watch.WatchStoreUtils;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -77,15 +79,13 @@ public class TriggeredWatchStore extends AbstractComponent {
} }
public boolean validate(ClusterState state) { public boolean validate(ClusterState state) {
IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); try {
if (indexMetaData != null) { IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData());
if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { return state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive();
logger.debug("not all primary shards of the [{}] index are started, so we cannot load previous triggered watches", } catch (IndexNotFoundException e) {
INDEX_NAME); } catch (IllegalStateException e) {
return false; logger.trace((Supplier<?>) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX_NAME), e);
} return false;
} else {
logger.debug("triggered watch index doesn't exist, so we can load");
} }
return true; return true;
} }
@ -235,15 +235,16 @@ public class TriggeredWatchStore extends AbstractComponent {
} }
public Collection<TriggeredWatch> loadTriggeredWatches(ClusterState state) { public Collection<TriggeredWatch> loadTriggeredWatches(ClusterState state) {
IndexMetaData indexMetaData = state.getMetaData().index(INDEX_NAME); IndexMetaData indexMetaData;
if (indexMetaData == null) { try {
logger.debug("no .triggered_watches indices found. skipping loading awaiting triggered watches"); indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX_NAME, state.metaData());
} catch (IndexNotFoundException e) {
return Collections.emptySet(); return Collections.emptySet();
} }
int numPrimaryShards; int numPrimaryShards;
if (!state.routingTable().index(INDEX_NAME).allPrimaryShardsActive()) { if (state.routingTable().index(indexMetaData.getIndex()).allPrimaryShardsActive() == false) {
throw illegalState("not all primary shards of the [{}] index are started.", INDEX_NAME); throw illegalState("not all primary shards of the triggered watches index {} are started", indexMetaData.getIndex());
} else { } else {
numPrimaryShards = indexMetaData.getNumberOfShards(); numPrimaryShards = indexMetaData.getNumberOfShards();
} }
@ -267,7 +268,7 @@ public class TriggeredWatchStore extends AbstractComponent {
String id = sh.getId(); String id = sh.getId();
try { try {
TriggeredWatch triggeredWatch = triggeredWatchParser.parse(id, sh.version(), sh.getSourceRef()); TriggeredWatch triggeredWatch = triggeredWatchParser.parse(id, sh.version(), sh.getSourceRef());
logger.debug("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id()); logger.trace("loaded triggered watch [{}/{}/{}]", sh.index(), sh.type(), sh.id());
triggeredWatches.add(triggeredWatch); triggeredWatches.add(triggeredWatch);
} catch (Exception e) { } catch (Exception e) {
logger.error( logger.error(
@ -279,6 +280,7 @@ public class TriggeredWatchStore extends AbstractComponent {
} finally { } finally {
client.clearScroll(response.getScrollId()); client.clearScroll(response.getScrollId());
} }
logger.debug("loaded [{}] triggered watches", triggeredWatches.size());
return triggeredWatches; return triggeredWatches;
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.DocumentMissingException; import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -86,35 +87,32 @@ public class WatchStore extends AbstractComponent {
return; return;
} }
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); try {
if (watchesIndexMetaData != null) { IndexMetaData indexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData());
try { int count = loadWatches(indexMetaData.getNumberOfShards());
int count = loadWatches(watchesIndexMetaData.getNumberOfShards()); logger.debug("loaded [{}] watches from the watches index [{}]", count, indexMetaData.getIndex().getName());
logger.debug("loaded [{}] watches from the watches index [{}]", count, INDEX); } catch (IndexNotFoundException e) {
started.set(true); } catch (Exception e) {
} catch (Exception e) { logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e);
logger.debug((Supplier<?>) () -> new ParameterizedMessage("failed to load watches for watch index [{}]", INDEX), e); watches.clear();
watches.clear(); throw e;
throw e;
}
} else {
started.set(true);
} }
started.set(true);
} }
public boolean validate(ClusterState state) { public boolean validate(ClusterState state) {
IndexMetaData watchesIndexMetaData = state.getMetaData().index(INDEX); IndexMetaData watchesIndexMetaData;
if (watchesIndexMetaData == null) { try {
logger.debug("index [{}] doesn't exist, so we can start", INDEX); watchesIndexMetaData = WatchStoreUtils.getConcreteIndex(INDEX, state.metaData());
} catch (IndexNotFoundException e) {
return true; return true;
} } catch (IllegalStateException e) {
if (state.routingTable().index(INDEX).allPrimaryShardsActive()) { logger.trace((Supplier<?>) () -> new ParameterizedMessage("error getting index meta data [{}]: ", INDEX), e);
logger.debug("index [{}] exists and all primary shards are started, so we can start", INDEX);
return true;
} else {
logger.debug("not all primary shards active for index [{}], so we cannot start", INDEX);
return false; return false;
} }
return state.routingTable().index(watchesIndexMetaData.getIndex().getName()).allPrimaryShardsActive();
} }
public boolean started() { public boolean started() {
@ -374,5 +372,4 @@ public class WatchStore extends AbstractComponent {
return response; return response;
} }
} }
} }

View File

@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.index.IndexNotFoundException;
public class WatchStoreUtils {
/**
* Method to get indexmetadata of a index, that potentially is behind an alias.
*
* @param name Name of the index or the alias
* @param metaData Metadata to search for the name
* @return IndexMetaData of the concrete index
* @throws IllegalStateException If an alias points to two indices
* @throws IndexNotFoundException If no index exists
*/
public static IndexMetaData getConcreteIndex(String name, MetaData metaData) {
AliasOrIndex aliasOrIndex = metaData.getAliasAndIndexLookup().get(name);
if (aliasOrIndex == null) {
throw new IndexNotFoundException(name);
}
if (aliasOrIndex.isAlias() && aliasOrIndex.getIndices().size() > 1) {
throw new IllegalStateException("Alias [" + name + "] points to more than one index");
}
return aliasOrIndex.getIndices().get(0);
}
}

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.execution; package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -53,6 +55,12 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class TriggeredWatchStoreTests extends ESTestCase { public class TriggeredWatchStoreTests extends ESTestCase {
private Settings indexSettings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
private WatcherClientProxy clientProxy; private WatcherClientProxy clientProxy;
private TriggeredWatch.Parser parser; private TriggeredWatch.Parser parser;
private TriggeredWatchStore triggeredWatchStore; private TriggeredWatchStore triggeredWatchStore;
@ -67,8 +75,8 @@ public class TriggeredWatchStoreTests extends ESTestCase {
public void testLoadWatchRecordsNoPriorHistoryIndices() throws Exception { public void testLoadWatchRecordsNoPriorHistoryIndices() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
assertThat(triggeredWatchStore.validate(cs), is(true)); assertThat(triggeredWatchStore.validate(cs), is(true));
@ -82,7 +90,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
int numShards = 2 + randomInt(2); int numShards = 2 + randomInt(2);
int numStartedShards = 1; int numStartedShards = 1;
@ -90,9 +98,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build(); .build();
metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings) metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings)
.numberOfShards(numShards).numberOfReplicas(1)); .numberOfShards(numShards).numberOfReplicas(1));
final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex(); final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
for (int i = 0; i < numShards; i++) { for (int i = 0; i < numShards; i++) {
ShardRoutingState state; ShardRoutingState state;
@ -110,17 +118,13 @@ public class TriggeredWatchStoreTests extends ESTestCase {
} }
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
assertThat(triggeredWatchStore.validate(cs), is(false)); assertThat(triggeredWatchStore.validate(cs), is(false));
try { IllegalStateException e = expectThrows(IllegalStateException.class, () -> triggeredWatchStore.loadTriggeredWatches(cs));
triggeredWatchStore.loadTriggeredWatches(cs); assertThat(e.getMessage(), is("not all primary shards of the triggered watches index [.triggered_watches] are started"));
fail("exception expected, because not all primary shards are started");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("not all primary shards of the [.triggered_watches] index are started."));
}
verifyZeroInteractions(clientProxy); verifyZeroInteractions(clientProxy);
} }
@ -129,13 +133,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
Settings settings = settings(Version.CURRENT) metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings));
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
ShardId shardId = new ShardId(index, 0); ShardId shardId = new ShardId(index, 0);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
@ -143,7 +143,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
@ -164,13 +164,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
Settings settings = settings(Version.CURRENT) metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings));
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
ShardId shardId = new ShardId(index, 0); ShardId shardId = new ShardId(index, 0);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
@ -178,7 +174,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
@ -208,13 +204,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
Settings settings = settings(Version.CURRENT) metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings));
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
ShardId shardId = new ShardId(index, 0); ShardId shardId = new ShardId(index, 0);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
@ -222,7 +214,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
@ -251,13 +243,9 @@ public class TriggeredWatchStoreTests extends ESTestCase {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
Settings settings = settings(Version.CURRENT) metaDataBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(indexSettings));
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) final Index index = metaDataBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(TriggeredWatchStore.INDEX_NAME).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(TriggeredWatchStore.INDEX_NAME).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
ShardId shardId = new ShardId(index, 0); ShardId shardId = new ShardId(index, 0);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
@ -265,7 +253,7 @@ public class TriggeredWatchStoreTests extends ESTestCase {
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
@ -312,6 +300,63 @@ public class TriggeredWatchStoreTests extends ESTestCase {
verify(clientProxy, times(1)).clearScroll(anyString()); verify(clientProxy, times(1)).clearScroll(anyString());
} }
// the elasticsearch migration helper is doing reindex using aliases, so we have to
// make sure that the watch store supports a single alias pointing to the watch index
public void testLoadStoreAsAlias() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
MetaData.Builder metaDataBuilder = MetaData.builder();
metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings)
.putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build()));
final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
ShardId shardId = new ShardId(index, 0);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(TestShardRouting.newShardRouting(shardId, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
assertThat(triggeredWatchStore.validate(cs), is(true));
verifyZeroInteractions(clientProxy);
}
// the elasticsearch migration helper is doing reindex using aliases, so we have to
// make sure that the watch store supports only a single index in an alias
public void testLoadingFailsWithTwoAliases() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
metaDataBuilder.put(IndexMetaData.builder("triggered-watches-alias").settings(indexSettings)
.putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build()));
metaDataBuilder.put(IndexMetaData.builder("whatever").settings(indexSettings)
.putAlias(new AliasMetaData.Builder(TriggeredWatchStore.INDEX_NAME).build()));
final Index index = metaDataBuilder.get("triggered-watches-alias").getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting("triggered-watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
final Index otherIndex = metaDataBuilder.get("whatever").getIndex();
IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex);
otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
assertThat(triggeredWatchStore.validate(cs), is(false));
verifyZeroInteractions(clientProxy);
}
private RefreshResponse mockRefreshResponse(int total, int successful) { private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class); RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total); when(refreshResponse.getTotalShards()).thenReturn(total);

View File

@ -6,6 +6,8 @@
package org.elasticsearch.xpack.watcher.test; package org.elasticsearch.xpack.watcher.test;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
@ -15,12 +17,15 @@ import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.Callback; import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
@ -51,12 +56,14 @@ import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.crypto.CryptoService; import org.elasticsearch.xpack.security.crypto.CryptoService;
import org.elasticsearch.xpack.support.clock.Clock; import org.elasticsearch.xpack.support.clock.Clock;
import org.elasticsearch.xpack.support.clock.ClockMock; import org.elasticsearch.xpack.support.clock.ClockMock;
import org.elasticsearch.xpack.template.TemplateUtils;
import org.elasticsearch.xpack.watcher.WatcherLifeCycleService; import org.elasticsearch.xpack.watcher.WatcherLifeCycleService;
import org.elasticsearch.xpack.watcher.WatcherService; import org.elasticsearch.xpack.watcher.WatcherService;
import org.elasticsearch.xpack.watcher.WatcherState; import org.elasticsearch.xpack.watcher.WatcherState;
import org.elasticsearch.xpack.watcher.client.WatcherClient; import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.execution.ExecutionService; import org.elasticsearch.xpack.watcher.execution.ExecutionService;
import org.elasticsearch.xpack.watcher.execution.ExecutionState; import org.elasticsearch.xpack.watcher.execution.ExecutionState;
import org.elasticsearch.xpack.watcher.execution.TriggeredWatchStore;
import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry; import org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry;
import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource; import org.elasticsearch.xpack.watcher.support.xcontent.XContentSource;
@ -64,6 +71,7 @@ import org.elasticsearch.xpack.watcher.trigger.ScheduleTriggerEngineMock;
import org.elasticsearch.xpack.watcher.trigger.TriggerService; import org.elasticsearch.xpack.watcher.trigger.TriggerService;
import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule; import org.elasticsearch.xpack.watcher.trigger.schedule.ScheduleModule;
import org.elasticsearch.xpack.watcher.watch.Watch; import org.elasticsearch.xpack.watcher.watch.Watch;
import org.elasticsearch.xpack.watcher.watch.WatchStore;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.jboss.netty.util.internal.SystemPropertyUtil; import org.jboss.netty.util.internal.SystemPropertyUtil;
import org.junit.After; import org.junit.After;
@ -90,6 +98,7 @@ import java.util.function.Function;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery; import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE; import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue;
import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.HISTORY_TEMPLATE_NAME;
import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME; import static org.elasticsearch.xpack.watcher.support.WatcherIndexTemplateRegistry.TRIGGERED_TEMPLATE_NAME;
@ -245,6 +254,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
public void _setup() throws Exception { public void _setup() throws Exception {
setupTimeWarp(); setupTimeWarp();
startWatcherIfNodesExist(); startWatcherIfNodesExist();
configureAliasesForWatcherIndices();
} }
@After @After
@ -304,6 +314,52 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
} }
} }
/**
* In order to test, that .watches and .triggered-watches indices can also point to an alias, we will rarely create those
* after starting watcher
*
* The idea behind this is the possible use of the migration helper for upgrades, see
* https://github.com/elastic/elasticsearch-migration/
*
*/
private void configureAliasesForWatcherIndices() throws Exception {
// alias for .watches, setting the index template to the same as well
if (rarely()) {
String newIndex = ".watches-alias-index";
BytesReference bytesReference = TemplateUtils.load("/watches.json");
try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) {
Map<String, Object> parserMap = parser.map();
Map<String, Object> allMappings = (Map<String, Object>) parserMap.get("mappings");
CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex)
.setCause("Index to test aliases with .watches index")
.addAlias(new Alias(WatchStore.INDEX))
.setSettings((Map<String, Object>) parserMap.get("settings"))
.addMapping("watch", (Map<String, Object>) allMappings.get("watch"))
.get();
assertAcked(response);
}
}
// alias for .triggered-watches, ensuring the index template is set appropriately
if (rarely()) {
String newIndex = ".triggered-watches-alias-index";
BytesReference bytesReference = TemplateUtils.load("/triggered_watches.json");
try (XContentParser parser = JsonXContent.jsonXContent.createParser(bytesReference.toBytesRef().bytes)) {
Map<String, Object> parserMap = parser.map();
Map<String, Object> allMappings = (Map<String, Object>) parserMap.get("mappings");
CreateIndexResponse response = client().admin().indices().prepareCreate(newIndex)
.setCause("Index to test aliases with .triggered-watches index")
.addAlias(new Alias(TriggeredWatchStore.INDEX_NAME))
.setSettings((Map<String, Object>) parserMap.get("settings"))
.addMapping("triggered_watch", (Map<String, Object>) allMappings.get("triggered_watch"))
.get();
assertAcked(response);
}
}
}
protected TimeWarp timeWarp() { protected TimeWarp timeWarp() {
assert timeWarped() : "cannot access TimeWarp when test context is not time warped"; assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
return timeWarp; return timeWarp;

View File

@ -47,8 +47,8 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
import static org.joda.time.DateTimeZone.UTC; import static org.joda.time.DateTimeZone.UTC;
@TestLogging("org.elasticsearch.watcher:TRACE")
public class BootStrapTests extends AbstractWatcherIntegrationTestCase { public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
@Override @Override
protected boolean timeWarped() { protected boolean timeWarped() {
// timewarping isn't necessary here, because we aren't testing triggering or throttling // timewarping isn't necessary here, because we aren't testing triggering or throttling
@ -188,10 +188,11 @@ public class BootStrapTests extends AbstractWatcherIntegrationTestCase {
DateTime now = DateTime.now(UTC); DateTime now = DateTime.now(UTC);
Wid wid = new Wid("_id", 1, now); Wid wid = new Wid("_id", 1, now);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now); ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value()) client().prepareIndex(TriggeredWatchStore.INDEX_NAME, TriggeredWatchStore.DOC_TYPE, wid.value())
.setSource(jsonBuilder().startObject() .setSource(jsonBuilder().startObject()
.startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName()) .startObject(WatchRecord.Field.TRIGGER_EVENT.getPreferredName())
.field(event.type(), event) .field(event.type(), event)
.endObject() .endObject()
.endObject()) .endObject())
.setWaitForActiveShards(ActiveShardCount.ALL) .setWaitForActiveShards(ActiveShardCount.ALL)

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.watcher.watch; package org.elasticsearch.xpack.watcher.watch;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -13,6 +14,7 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -80,8 +82,8 @@ public class WatchStoreTests extends ESTestCase {
public void testStartNoPreviousWatchesIndex() throws Exception { public void testStartNoPreviousWatchesIndex() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(true)); assertThat(watchStore.validate(cs), is(true));
@ -96,14 +98,14 @@ public class WatchStoreTests extends ESTestCase {
public void testStartPrimaryShardNotReady() { public void testStartPrimaryShardNotReady() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name")); ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT) Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build(); .build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true,
@ -111,7 +113,7 @@ public class WatchStoreTests extends ESTestCase {
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder); csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build()); csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build(); ClusterState cs = csBuilder.build();
@ -378,26 +380,134 @@ public class WatchStoreTests extends ESTestCase {
assertThat(watchStore.activeWatches(), hasSize(0)); assertThat(watchStore.activeWatches(), hasSize(0));
} }
/* // the elasticsearch migration helper is doing reindex using aliases, so we have to
* Creates the standard cluster state metadata for the watches index // make sure that the watch store supports a single alias pointing to the watch index
* with shards/replicas being marked as started public void testThatStartingWithWatchesIndexAsAliasWorks() throws Exception {
*/ ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
private void createWatchIndexMetaData(ClusterState.Builder builder) {
MetaData.Builder metaDateBuilder = MetaData.builder(); MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT) Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build(); .build();
metaDateBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1)); metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1)
final Index index = metaDateBuilder.get(WatchStore.INDEX).getIndex(); .putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build()));
final Index index = metaDataBuilder.get("watches-alias").getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build());
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
BytesReference source = new BytesArray("{}");
InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new Text("type"), Collections.<String, SearchHitField>emptyMap());
hit1.sourceRef(source);
InternalSearchHit hit2 = new InternalSearchHit(1, "_id2", new Text("type"), Collections.<String, SearchHitField>emptyMap());
hit2.sourceRef(source);
SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2, hit1, hit2);
when(clientProxy.search(any(SearchRequest.class), any(TimeValue.class))).thenReturn(searchResponse1);
InternalSearchHit hit3 = new InternalSearchHit(2, "_id3", new Text("type"), Collections.<String, SearchHitField>emptyMap());
hit3.sourceRef(source);
InternalSearchHit hit4 = new InternalSearchHit(3, "_id4", new Text("type"), Collections.<String, SearchHitField>emptyMap());
hit4.sourceRef(source);
SearchResponse searchResponse2 = mockSearchResponse(1, 1, 2, hit3, hit4);
SearchResponse searchResponse3 = mockSearchResponse(1, 1, 2);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3);
Watch watch1 = mock(Watch.class);
WatchStatus status = mock(WatchStatus.class);
when(watch1.status()).thenReturn(status);
Watch watch2 = mock(Watch.class);
when(watch2.status()).thenReturn(status);
Watch watch3 = mock(Watch.class);
when(watch3.status()).thenReturn(status);
Watch watch4 = mock(Watch.class);
when(watch4.status()).thenReturn(status);
when(parser.parse("_id1", true, source, true)).thenReturn(watch1);
when(parser.parse("_id2", true, source, true)).thenReturn(watch2);
when(parser.parse("_id3", true, source, true)).thenReturn(watch3);
when(parser.parse("_id4", true, source, true)).thenReturn(watch4);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(true));
watchStore.start(cs);
assertThat(watchStore.started(), is(true));
assertThat(watchStore.watches().size(), equalTo(4));
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
// the elasticsearch migration helper is doing reindex using aliases, so we have to
// make sure that the watch store supports only a single index in an alias
public void testThatWatchesIndexWithTwoAliasesFails() throws Exception {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDataBuilder.put(IndexMetaData.builder("watches-alias").settings(settings).numberOfShards(1).numberOfReplicas(1)
.putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build()));
metaDataBuilder.put(IndexMetaData.builder("whatever").settings(settings).numberOfShards(1).numberOfReplicas(1)
.putAlias(new AliasMetaData.Builder(WatchStore.INDEX).build()));
final Index index = metaDataBuilder.get("watches-alias").getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting("watches-alias", 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
indexRoutingTableBuilder.addReplica();
final Index otherIndex = metaDataBuilder.get("whatever").getIndex();
IndexRoutingTable.Builder otherIndexRoutingTableBuilder = IndexRoutingTable.builder(otherIndex);
otherIndexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting("whatever", 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build());
otherIndexRoutingTableBuilder.addReplica();
routingTableBuilder.add(otherIndexRoutingTableBuilder.build());
csBuilder.metaData(metaDataBuilder);
csBuilder.routingTable(routingTableBuilder.build());
ClusterState cs = csBuilder.build();
assertThat(watchStore.validate(cs), is(false));
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> watchStore.start(cs));
assertThat(exception.getMessage(), is("Alias [.watches] points to more than one index"));
}
/*
* Creates the standard cluster state metadata for the watches index
* with shards/replicas being marked as started
*/
private void createWatchIndexMetaData(ClusterState.Builder builder) {
MetaData.Builder metaDataBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = settings(Version.CURRENT)
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDataBuilder.put(IndexMetaData.builder(WatchStore.INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
final Index index = metaDataBuilder.get(WatchStore.INDEX).getIndex();
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0)) indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(index, 0))
.addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED)) .addShard(TestShardRouting.newShardRouting(WatchStore.INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED))
.build()); .build());
indexRoutingTableBuilder.addReplica(); indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build()); routingTableBuilder.add(indexRoutingTableBuilder.build());
builder.metaData(metaDateBuilder); builder.metaData(metaDataBuilder);
builder.routingTable(routingTableBuilder.build()); builder.routingTable(routingTableBuilder.build());
} }