Previously, Watcher only attached its listener to indices that started with the prefix `.watches`, which causes Watcher to silently fail to schedule newly created Watches if the `.watches` alias is redirected to an index that does not start with `.watches`. Watcher now attaches the listener to all indices, so that Watcher can respond to changes in which index has the `.watches` alias. Also adjusts the tests to randomly use non-prefixed concrete indices for .watches and .triggered_watches.
This commit is contained in:
parent
ecb6df137c
commit
eb288a6f85
|
@ -574,11 +574,9 @@ public class Watcher extends Plugin implements ActionPlugin, ScriptPlugin, Reloa
|
|||
}
|
||||
|
||||
assert listener != null;
|
||||
// for now, we only add this index operation listener to indices starting with .watches
|
||||
// this also means, that aliases pointing to this index have to follow this notation
|
||||
if (module.getIndex().getName().startsWith(Watch.INDEX)) {
|
||||
module.addIndexOperationListener(listener);
|
||||
}
|
||||
// Attach a listener to every index so that we can react to alias changes.
|
||||
// This listener will be a no-op except on the index pointed to by .watches
|
||||
module.addIndexOperationListener(listener);
|
||||
}
|
||||
|
||||
static void validAutoCreateIndex(Settings settings, Logger logger) {
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.watcher;
|
||||
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
|
||||
import org.elasticsearch.xpack.core.watcher.watch.Watch;
|
||||
import org.elasticsearch.xpack.watcher.condition.InternalAlwaysCondition;
|
||||
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.xpack.watcher.actions.ActionBuilders.indexAction;
|
||||
import static org.elasticsearch.xpack.watcher.client.WatchSourceBuilders.watchBuilder;
|
||||
import static org.elasticsearch.xpack.watcher.input.InputBuilders.noneInput;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class WatcherConcreteIndexTests extends AbstractWatcherIntegrationTestCase {
|
||||
|
||||
@Override
|
||||
protected boolean timeWarped() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void testCanUseAnyConcreteIndexName() throws Exception {
|
||||
String newWatcherIndexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
String watchResultsIndex = randomAlphaOfLength(11).toLowerCase(Locale.ROOT);
|
||||
createIndex(watchResultsIndex);
|
||||
|
||||
stopWatcher();
|
||||
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, newWatcherIndexName, Watch.DOC_TYPE);
|
||||
startWatcher();
|
||||
|
||||
PutWatchResponse putWatchResponse = watcherClient().preparePutWatch("mywatch").setSource(watchBuilder()
|
||||
.trigger(schedule(interval("3s")))
|
||||
.input(noneInput())
|
||||
.condition(InternalAlwaysCondition.INSTANCE)
|
||||
.addAction("indexer", indexAction(watchResultsIndex, "_doc")))
|
||||
.get();
|
||||
|
||||
assertTrue(putWatchResponse.isCreated());
|
||||
|
||||
assertBusy(() -> {
|
||||
SearchResponse searchResult = client().prepareSearch(watchResultsIndex).setTrackTotalHits(true).get();
|
||||
assertThat((int) searchResult.getHits().getTotalHits().value, greaterThan(0));
|
||||
});
|
||||
}
|
||||
}
|
|
@ -6,7 +6,9 @@
|
|||
package org.elasticsearch.xpack.watcher.test;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -14,6 +16,7 @@ import org.elasticsearch.action.support.IndicesOptions;
|
|||
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
|
@ -194,7 +197,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
internalCluster().setDisruptionScheme(ice);
|
||||
ice.startDisrupting();
|
||||
}
|
||||
|
||||
stopWatcher();
|
||||
createWatcherIndicesOrAliases();
|
||||
startWatcher();
|
||||
}
|
||||
|
@ -221,13 +224,19 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
// alias for .watches, setting the index template to the same as well
|
||||
String watchIndexName;
|
||||
String triggeredWatchIndexName;
|
||||
if (rarely()) {
|
||||
watchIndexName = ".watches-alias-index";
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate(watchIndexName)
|
||||
if (randomBoolean()) {
|
||||
// Create an index to get the template
|
||||
String tempIndex = ".watches" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
|
||||
.setCause("Index to test aliases with .watches index")
|
||||
.addAlias(new Alias(Watch.INDEX))
|
||||
.get();
|
||||
assertAcked(response);
|
||||
|
||||
// Now replace it with a randomly named index
|
||||
watchIndexName = randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT);
|
||||
replaceWatcherIndexWithRandomlyNamedIndex(Watch.INDEX, watchIndexName, Watch.DOC_TYPE);
|
||||
|
||||
logger.info("set alias for .watches index to [{}]", watchIndexName);
|
||||
} else {
|
||||
watchIndexName = Watch.INDEX;
|
||||
|
@ -239,13 +248,19 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
}
|
||||
|
||||
// alias for .triggered-watches, ensuring the index template is set appropriately
|
||||
if (rarely()) {
|
||||
triggeredWatchIndexName = ".triggered_watches-alias-index";
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate(triggeredWatchIndexName)
|
||||
if (randomBoolean()) {
|
||||
String tempIndex = ".triggered_watches-alias-index";
|
||||
CreateIndexResponse response = client().admin().indices().prepareCreate(tempIndex)
|
||||
.setCause("Index to test aliases with .triggered-watches index")
|
||||
.addAlias(new Alias(TriggeredWatchStoreField.INDEX_NAME))
|
||||
.get();
|
||||
assertAcked(response);
|
||||
|
||||
// Now replace it with a randomly-named index
|
||||
triggeredWatchIndexName = randomValueOtherThan(watchIndexName,
|
||||
() -> randomAlphaOfLengthBetween(5,10).toLowerCase(Locale.ROOT));
|
||||
replaceWatcherIndexWithRandomlyNamedIndex(TriggeredWatchStoreField.INDEX_NAME, triggeredWatchIndexName,
|
||||
TriggeredWatchStoreField.DOC_TYPE);
|
||||
logger.info("set alias for .triggered-watches index to [{}]", triggeredWatchIndexName);
|
||||
} else {
|
||||
triggeredWatchIndexName = TriggeredWatchStoreField.INDEX_NAME;
|
||||
|
@ -259,6 +274,38 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
|
|||
}
|
||||
}
|
||||
|
||||
public void replaceWatcherIndexWithRandomlyNamedIndex(String originalIndexOrAlias, String to, String docType) {
|
||||
GetIndexResponse index = client().admin().indices().prepareGetIndex().setIndices(originalIndexOrAlias).get();
|
||||
MappingMetaData mapping = index.getMappings().get(index.getIndices()[0]).get(docType);
|
||||
|
||||
Settings settings = index.getSettings().get(index.getIndices()[0]);
|
||||
Settings.Builder newSettings = Settings.builder().put(settings);
|
||||
newSettings.remove("index.provided_name");
|
||||
newSettings.remove("index.uuid");
|
||||
newSettings.remove("index.creation_date");
|
||||
newSettings.remove("index.version.created");
|
||||
|
||||
CreateIndexResponse createIndexResponse = client().admin().indices().prepareCreate(to)
|
||||
.addMapping(docType, mapping.sourceAsMap())
|
||||
.setSettings(newSettings)
|
||||
.get();
|
||||
assertTrue(createIndexResponse.isAcknowledged());
|
||||
ensureGreen(to);
|
||||
|
||||
AtomicReference<String> originalIndex = new AtomicReference<>(originalIndexOrAlias);
|
||||
boolean watchesIsAlias = client().admin().indices().prepareAliasesExist(originalIndexOrAlias).get().isExists();
|
||||
if (watchesIsAlias) {
|
||||
GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases(originalIndexOrAlias).get();
|
||||
assertEquals(1, aliasesResponse.getAliases().size());
|
||||
aliasesResponse.getAliases().forEach((aliasRecord) -> {
|
||||
assertEquals(1, aliasRecord.value.size());
|
||||
originalIndex.set(aliasRecord.key);
|
||||
});
|
||||
}
|
||||
client().admin().indices().prepareDelete(originalIndex.get()).get();
|
||||
client().admin().indices().prepareAliases().addAlias(to, originalIndexOrAlias).get();
|
||||
}
|
||||
|
||||
protected TimeWarp timeWarp() {
|
||||
assert timeWarped() : "cannot access TimeWarp when test context is not time warped";
|
||||
return timeWarp;
|
||||
|
|
Loading…
Reference in New Issue