From 3502a9901b57497e211520747421681448d05cb4 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Tue, 13 Jun 2017 08:43:33 -0400 Subject: [PATCH] Add watcher upgrade procedure (elastic/x-pack-elasticsearch#1603) Relates to elastic/x-pack-elasticsearch#1214 Original commit: elastic/x-pack-elasticsearch@1017d60df4e3ec449f5ccf451b2f76795dd619a3 --- .../xpack/upgrade/IndexUpgradeCheck.java | 108 +++++++--------- .../xpack/upgrade/IndexUpgradeService.java | 58 +++++---- .../xpack/upgrade/InternalIndexReindexer.java | 26 +++- .../elasticsearch/xpack/upgrade/Upgrade.java | 115 +++++++++++++++--- .../xpack/upgrade/IndexUpgradeCheckTests.java | 72 +++++++---- .../xpack/upgrade/IndexUpgradeIT.java | 43 +++++++ .../upgrade/IndexUpgradeServiceTests.java | 56 +++++++-- .../upgrade/InternalIndexReindexerIT.java | 7 +- 8 files changed, 332 insertions(+), 153 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java index 967ebfb66cf..06bcb9ab4eb 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java @@ -10,14 +10,16 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.script.Script; +import org.elasticsearch.transport.TransportResponse; import java.util.Map; -import java.util.function.Predicate; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; /** * Generic upgrade check applicable to all indices to be upgraded from the current version @@ -29,74 +31,56 @@ import java.util.function.Predicate; * - reindex is performed * - postUpgrade is called if reindex was successful */ -public class IndexUpgradeCheck extends AbstractComponent { +public class IndexUpgradeCheck extends AbstractComponent { public static final int UPRADE_VERSION = 6; private final String name; - private final Predicate>> isSupported; - private final InternalIndexReindexer reindexer; - private final UpgradeActionRequired matchAction; - private final UpgradeActionRequired noMatchAction; + private final BiFunction, UpgradeActionRequired> actionRequired; + private final InternalIndexReindexer reindexer; /** - * Creates a new upgrade check that doesn't support upgrade + * Creates a new upgrade check * - * @param name - the name of the check - * @param settings - system settings - * @param isSupported - return true if they can work with the index with specified name - * @param matchAction - action if isSupported return true - * @param noMatchAction - action if isSupported return false + * @param name - the name of the check + * @param settings - system settings + * @param actionRequired - return true if they can work with the index with specified name + * @param client - client + * @param clusterService - cluster service + * @param types - a list of types that the reindexing should be limited to + * @param updateScript - the upgrade script that should be used during reindexing */ public IndexUpgradeCheck(String name, Settings settings, - Predicate>> isSupported, - UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) { - super(settings); - this.name = name; - this.isSupported = isSupported; - this.reindexer = null; - this.matchAction = matchAction; - this.noMatchAction = noMatchAction; + BiFunction, UpgradeActionRequired> actionRequired, + Client client, ClusterService clusterService, String[] types, Script updateScript) { + this(name, settings, actionRequired, client, clusterService, types, updateScript, + listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); } /** * Creates a new upgrade check * - * @param name - the name of the check - * @param settings - system settings - * @param client - client - * @param clusterService - cluster service - * @param isSupported - return true if they can work with the index with specified name - * @param types - a list of types that the reindexing should be limited to - * @param updateScript - the upgrade script that should be used during reindexing + * @param name - the name of the check + * @param settings - system settings + * @param actionRequired - return true if they can work with the index with specified name + * @param client - client + * @param clusterService - cluster service + * @param types - a list of types that the reindexing should be limited to + * @param updateScript - the upgrade script that should be used during reindexing + * @param preUpgrade - action that should be performed before upgrade + * @param postUpgrade - action that should be performed after upgrade */ - public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService, - Predicate>> isSupported, - String[] types, Script updateScript) { - this(name, settings, client, clusterService, isSupported, types, updateScript, UpgradeActionRequired.UPGRADE, - UpgradeActionRequired.NOT_APPLICABLE); - } - - /** - * Creates a new upgrade check - * - * @param name - the name of the check - * @param settings - system settings - * @param client - client - * @param clusterService - cluster service - * @param isSupported - return true if they can work with the index with specified name - * @param types - a list of types that the reindexing should be limited to - * @param updateScript - the upgrade script that should be used during reindexing - */ - public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService, - Predicate>> isSupported, - String[] types, Script updateScript, UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) { + public IndexUpgradeCheck(String name, Settings settings, + BiFunction, UpgradeActionRequired> actionRequired, + Client client, ClusterService clusterService, String[] types, Script updateScript, + Consumer> preUpgrade, + BiConsumer> postUpgrade) { super(settings); this.name = name; - this.isSupported = isSupported; - this.reindexer = new InternalIndexReindexer(client, clusterService, UPRADE_VERSION, updateScript, types); - this.matchAction = matchAction; - this.noMatchAction = noMatchAction; + this.actionRequired = actionRequired; + this.reindexer = new InternalIndexReindexer<>(client, clusterService, UPRADE_VERSION, updateScript, types, preUpgrade, + postUpgrade); } + /** * Returns the name of the check */ @@ -109,31 +93,21 @@ public class IndexUpgradeCheck extends AbstractComponent { * * @param indexMetaData index metadata * @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams} - * @param state current cluster state * @return required action or UpgradeActionRequired.NOT_APPLICABLE if this check cannot be performed on the index */ - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - if (isSupported.test(new Tuple<>(indexMetaData, params))) { - return matchAction; - } - return noMatchAction; + public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params) { + return actionRequired.apply(indexMetaData, params); } /** * Perform the index upgrade * * @param indexMetaData index metadata - * @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams} * @param state current cluster state * @param listener the listener that should be called upon completion of the upgrade */ - public void upgrade(IndexMetaData indexMetaData, Map params, ClusterState state, + public void upgrade(IndexMetaData indexMetaData, ClusterState state, ActionListener listener) { - if (reindexer == null) { - throw new UnsupportedOperationException(getName() + " check doesn't support index upgrade"); - } else { - reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener); - } + reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener); } - } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java index dfb1f9b0e0f..59bda844ae5 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; @@ -49,32 +50,43 @@ public class IndexUpgradeService extends AbstractComponent { MetaData metaData = state.getMetaData(); for (String index : concreteIndexNames) { IndexMetaData indexMetaData = metaData.index(index); - indexCheck: - for (IndexUpgradeCheck check : upgradeChecks) { - UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params, state); - logger.trace("[{}] check [{}] returned [{}]", index, check.getName(), upgradeActionRequired); - switch (upgradeActionRequired) { - case UPGRADE: - case REINDEX: - // this index needs to be upgraded or reindexed - skipping all other checks - results.put(index, upgradeActionRequired); - break indexCheck; - case UP_TO_DATE: - // this index is good - skipping all other checks - break indexCheck; - case NOT_APPLICABLE: - // this action is not applicable to this index - skipping to the next one - break; - default: - throw new IllegalStateException("unknown upgrade action " + upgradeActionRequired + " for the index " - + index); - - } + UpgradeActionRequired upgradeActionRequired = upgradeInfo(indexMetaData, index, params); + if (upgradeActionRequired != null) { + results.put(index, upgradeActionRequired); } } return results; } + private UpgradeActionRequired upgradeInfo(IndexMetaData indexMetaData, String index, Map params) { + for (IndexUpgradeCheck check : upgradeChecks) { + UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params); + logger.trace("[{}] check [{}] returned [{}]", index, check.getName(), upgradeActionRequired); + switch (upgradeActionRequired) { + case UPGRADE: + case REINDEX: + // this index needs to be upgraded or reindexed - skipping all other checks + return upgradeActionRequired; + case UP_TO_DATE: + // this index is good - skipping all other checks + return null; + case NOT_APPLICABLE: + // this action is not applicable to this index - skipping to the next one + break; + default: + throw new IllegalStateException("unknown upgrade action " + upgradeActionRequired + " for the index " + + index); + + } + } + // Catch all check for all indices that didn't match the specific checks + if (indexMetaData.getCreationVersion().before(Version.V_5_0_0)) { + return UpgradeActionRequired.REINDEX; + } else { + return null; + } + } + public void upgrade(String index, Map params, ClusterState state, ActionListener listener) { IndexMetaData indexMetaData = state.metaData().index(index); @@ -82,11 +94,11 @@ public class IndexUpgradeService extends AbstractComponent { throw new IndexNotFoundException(index); } for (IndexUpgradeCheck check : upgradeChecks) { - UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params, state); + UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params); switch (upgradeActionRequired) { case UPGRADE: // this index needs to be upgraded - start the upgrade procedure - check.upgrade(indexMetaData, params, state, listener); + check.upgrade(indexMetaData, state, listener); return; case REINDEX: // this index needs to be re-indexed diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java index 3965e566528..d475a8626e6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -16,7 +16,6 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; @@ -26,6 +25,9 @@ import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.Script; import org.elasticsearch.transport.TransportResponse; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + /** * A component that performs the following upgrade procedure: *

@@ -35,23 +37,41 @@ import org.elasticsearch.transport.TransportResponse; * - Reindex from .{name} to .{name}-v6 with transform * - Delete index .{name} and add alias .{name} to .{name}-v6 */ -public class InternalIndexReindexer { +public class InternalIndexReindexer { private final Client client; private final ClusterService clusterService; private final Script transformScript; private final String[] types; private final int version; + private final Consumer> preUpgrade; + private final BiConsumer> postUpgrade; - public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types) { + public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types, + Consumer> preUpgrade, + BiConsumer> postUpgrade) { this.client = client; this.clusterService = clusterService; this.transformScript = transformScript; this.types = types; this.version = version; + this.preUpgrade = preUpgrade; + this.postUpgrade = postUpgrade; } public void upgrade(String index, ClusterState clusterState, ActionListener listener) { + preUpgrade.accept(ActionListener.wrap( + t -> innerUpgrade(index, clusterState, ActionListener.wrap( + response -> postUpgrade.accept(t, ActionListener.wrap( + empty -> listener.onResponse(response), + listener::onFailure + )), + listener::onFailure + )), + listener::onFailure)); + } + + private void innerUpgrade(String index, ClusterState clusterState, ActionListener listener) { String newIndex = index + "_v" + version; try { checkMasterAndDataNodeVersion(clusterState); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java index 709f9efee48..ffa7c011d6c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java @@ -6,8 +6,10 @@ package org.elasticsearch.xpack.upgrade; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; @@ -19,6 +21,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; @@ -26,12 +29,16 @@ import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptType; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeAction; import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeInfoAction; +import org.elasticsearch.xpack.watcher.client.WatcherClient; +import org.elasticsearch.xpack.watcher.transport.actions.service.WatcherServiceRequest; +import org.elasticsearch.xpack.watcher.transport.actions.stats.WatcherStatsRequest; import java.util.ArrayList; import java.util.Arrays; @@ -58,7 +65,7 @@ public class Upgrade implements ActionPlugin { this.upgradeCheckFactories = new ArrayList<>(); for (Tuple, BiFunction> checkFactory : Arrays.asList( getKibanaUpgradeCheckFactory(settings), - getGenericCheckFactory(settings))) { + getWatcherUpgradeCheckFactory(settings))) { extraParameters.addAll(checkFactory.v1()); upgradeCheckFactories.add(checkFactory.v2()); } @@ -93,32 +100,24 @@ public class Upgrade implements ActionPlugin { ); } - static Tuple, BiFunction> getGenericCheckFactory( - Settings settings) { - return new Tuple<>( - Collections.emptyList(), - (internalClient, clusterService) -> - new IndexUpgradeCheck("generic", settings, - indexAndParams -> indexAndParams.v1().getCreationVersion().before(Version.V_5_0_0_alpha1), - UpgradeActionRequired.REINDEX, - UpgradeActionRequired.UP_TO_DATE)); - } - static Tuple, BiFunction> getKibanaUpgradeCheckFactory( Settings settings) { return new Tuple<>( Collections.singletonList("kibana_indices"), (internalClient, clusterService) -> - new IndexUpgradeCheck("kibana", + new IndexUpgradeCheck("kibana", settings, - internalClient, - clusterService, - indexAndParams -> { - String indexName = indexAndParams.v1().getIndex().getName(); - String kibanaIndicesMasks = indexAndParams.v2().getOrDefault("kibana_indices", ".kibana"); + (indexMetaData, params) -> { + String indexName = indexMetaData.getIndex().getName(); + String kibanaIndicesMasks = params.getOrDefault("kibana_indices", ".kibana"); String[] kibanaIndices = Strings.delimitedListToStringArray(kibanaIndicesMasks, ","); - return Regex.simpleMatch(kibanaIndices, indexName); - }, + if (Regex.simpleMatch(kibanaIndices, indexName)) { + return UpgradeActionRequired.UPGRADE; + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + }, internalClient, + clusterService, Strings.EMPTY_ARRAY, new Script(ScriptType.INLINE, "painless", "ctx._id = ctx._type + \"-\" + ctx._id;\n" + "ctx._source = [ ctx._type : ctx._source ];\n" + @@ -126,4 +125,80 @@ public class Upgrade implements ActionPlugin { "ctx._type = \"doc\";", new HashMap<>()))); } + + static Tuple, BiFunction> getWatcherUpgradeCheckFactory( + Settings settings) { + return new Tuple<>( + Collections.emptyList(), + (internalClient, clusterService) -> + new IndexUpgradeCheck("watcher", + settings, + (indexMetaData, params) -> { + if (".watches".equals(indexMetaData.getIndex().getName()) || + indexMetaData.getAliases().containsKey(".watches")) { + if (indexMetaData.getMappings().size() == 1 && indexMetaData.getMappings().containsKey("doc") ) { + return UpgradeActionRequired.UP_TO_DATE; + } else { + return UpgradeActionRequired.UPGRADE; + } + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + }, internalClient, + clusterService, + new String[]{"watch"}, + new Script(ScriptType.INLINE, "painless", "ctx._type = \"doc\";\n" + + "if (ctx._source.containsKey(\"_status\") && !ctx._source.containsKey(\"status\") ) {}\n" + + " ctx._source.status = ctx._source.remove(\"_status\");\n" + + "}", + new HashMap<>()), + booleanActionListener -> preWatcherUpgrade(internalClient, booleanActionListener), + (shouldStartWatcher, listener) -> postWatcherUpgrade(internalClient, shouldStartWatcher, listener) + )); + } + + private static void preWatcherUpgrade(Client client, ActionListener listener) { + new WatcherClient(client).watcherStats(new WatcherStatsRequest(), ActionListener.wrap( + stats -> { + if (stats.watcherMetaData().manuallyStopped()) { + // don't start the watcher after upgrade + listener.onResponse(false); + } else { + // stop the watcher + new WatcherClient(client).watcherService(new WatcherServiceRequest().stop(), ActionListener.wrap( + stopResponse -> { + if (stopResponse.isAcknowledged()) { + listener.onResponse(true); + } else { + listener.onFailure(new IllegalStateException("unable to stop watcher service")); + } + }, listener::onFailure + )); + } + }, listener::onFailure)); + } + + private static void postWatcherUpgrade(Client client, Boolean shouldStartWatcher, ActionListener listener) { + client.admin().indices().prepareDelete("triggered-watches").execute(ActionListener.wrap(deleteIndexResponse -> { + startWatcherIfNeeded(shouldStartWatcher, client, listener); + }, e -> { + if (e instanceof IndexNotFoundException) { + startWatcherIfNeeded(shouldStartWatcher, client, listener); + } else { + listener.onFailure(e); + } + } + )); + } + + private static void startWatcherIfNeeded(Boolean shouldStartWatcher, Client client, ActionListener listener) { + if (shouldStartWatcher) { + // Start the watcher service + new WatcherClient(client).watcherService(new WatcherServiceRequest().start(), ActionListener.wrap( + stopResponse -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure + )); + } else { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java index a3b882646ab..2492ff36273 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java @@ -6,58 +6,67 @@ package org.elasticsearch.xpack.upgrade; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.Collections; import static org.hamcrest.core.IsEqual.equalTo; public class IndexUpgradeCheckTests extends ESTestCase { - public void testGenericUpgradeCheck() { - IndexUpgradeCheck check = Upgrade.getGenericCheckFactory(Settings.EMPTY).v2().apply(null, null); - assertThat(check.getName(), equalTo("generic")); - IndexMetaData goodIndex = newTestIndexMeta("good", Settings.EMPTY); - IndexMetaData badIndex = newTestIndexMeta("bad", - Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.0.0")).build()); - - assertThat(check.actionRequired(goodIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE), - equalTo(UpgradeActionRequired.UP_TO_DATE)); - assertThat(check.actionRequired(badIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE), - equalTo(UpgradeActionRequired.REINDEX)); - } - - public void testKibanaUpgradeCheck() { + public void testKibanaUpgradeCheck() throws Exception { IndexUpgradeCheck check = Upgrade.getKibanaUpgradeCheckFactory(Settings.EMPTY).v2().apply(null, null); assertThat(check.getName(), equalTo("kibana")); IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY); - assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE), + assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap()), equalTo(UpgradeActionRequired.UPGRADE)); IndexMetaData renamedKibanaIndex = newTestIndexMeta(".kibana2", Settings.EMPTY); - assertThat(check.actionRequired(renamedKibanaIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE), + assertThat(check.actionRequired(renamedKibanaIndex, Collections.emptyMap()), equalTo(UpgradeActionRequired.NOT_APPLICABLE)); - assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana*"), - ClusterState.EMPTY_STATE), equalTo(UpgradeActionRequired.UPGRADE)); + assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana*") + ), equalTo(UpgradeActionRequired.UPGRADE)); - assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana1,.kibana2"), - ClusterState.EMPTY_STATE), equalTo(UpgradeActionRequired.UPGRADE)); + assertThat(check.actionRequired(renamedKibanaIndex, Collections.singletonMap("kibana_indices", ".kibana1,.kibana2") + ), equalTo(UpgradeActionRequired.UPGRADE)); IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY); - assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE), + assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*")), equalTo(UpgradeActionRequired.NOT_APPLICABLE)); IndexMetaData securityIndex = newTestIndexMeta(".security", Settings.EMPTY); - assertThat(check.actionRequired(securityIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE), + assertThat(check.actionRequired(securityIndex, Collections.singletonMap("kibana_indices", ".kibana*")), equalTo(UpgradeActionRequired.NOT_APPLICABLE)); } - public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) { + public void testWatcherIndexUpgradeCheck() throws Exception{ + IndexUpgradeCheck check = Upgrade.getWatcherUpgradeCheckFactory(Settings.EMPTY).v2().apply(null, null); + assertThat(check.getName(), equalTo("watcher")); + + IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY); + assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap()), + equalTo(UpgradeActionRequired.NOT_APPLICABLE)); + + IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY); + assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*")), + equalTo(UpgradeActionRequired.UPGRADE)); + + IndexMetaData watcherIndexWithAlias = newTestIndexMeta("my_watches", ".watches", Settings.EMPTY, "watch"); + assertThat(check.actionRequired(watcherIndexWithAlias, Collections.emptyMap()), + equalTo(UpgradeActionRequired.UPGRADE)); + + IndexMetaData watcherIndexWithAliasUpgraded = newTestIndexMeta("my_watches", ".watches", Settings.EMPTY, "doc"); + assertThat(check.actionRequired(watcherIndexWithAliasUpgraded, Collections.emptyMap()), + equalTo(UpgradeActionRequired.UP_TO_DATE)); + } + + public static IndexMetaData newTestIndexMeta(String name, String alias, Settings indexSettings, String type) throws IOException { Settings build = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) @@ -66,7 +75,20 @@ public class IndexUpgradeCheckTests extends ESTestCase { .put(IndexMetaData.SETTING_VERSION_UPGRADED, Version.V_5_0_0_beta1) .put(indexSettings) .build(); - return IndexMetaData.builder(name).settings(build).build(); + IndexMetaData.Builder builder = IndexMetaData.builder(name).settings(build); + if (alias != null) { + // Create alias + builder.putAlias(AliasMetaData.newAliasMetaDataBuilder(alias).build()); + } + if (type != null) { + // Create fake type + builder.putMapping(type, "{}"); + } + return builder.build(); + } + + public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) throws IOException { + return newTestIndexMeta(name, null, indexSettings, "foo"); } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java index 05170d94cae..d5e7be3baa3 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java @@ -7,15 +7,19 @@ package org.elasticsearch.xpack.upgrade; import org.elasticsearch.ElasticsearchSecurityException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction.Response; import org.junit.Before; import java.util.Collections; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.containsString; @@ -70,4 +74,43 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase { assertEquals(2L, searchResponse.getHits().getTotalHits()); } + public void testInternalUpgradePrePostChecks() { + Long val = randomLong(); + AtomicBoolean preUpgradeIsCalled = new AtomicBoolean(); + AtomicBoolean postUpgradeIsCalled = new AtomicBoolean(); + + IndexUpgradeCheck check = new IndexUpgradeCheck( + "test", Settings.EMPTY, + (indexMetaData, stringStringMap) -> { + if (indexMetaData.getIndex().getName().equals("internal_index")) { + return UpgradeActionRequired.UPGRADE; + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + }, + client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null, + listener -> { + assertFalse(preUpgradeIsCalled.getAndSet(true)); + assertFalse(postUpgradeIsCalled.get()); + listener.onResponse(val); + }, + (aLong, listener) -> { + assertTrue(preUpgradeIsCalled.get()); + assertFalse(postUpgradeIsCalled.getAndSet(true)); + assertEquals(aLong, val); + listener.onResponse(TransportResponse.Empty.INSTANCE); + }); + + assertAcked(client().admin().indices().prepareCreate("internal_index").get()); + + IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Collections.singletonList(check)); + + PlainActionFuture future = PlainActionFuture.newFuture(); + service.upgrade("internal_index", Collections.emptyMap(), clusterService().state(), future); + future.actionGet(); + + assertTrue(preUpgradeIsCalled.get()); + assertTrue(postUpgradeIsCalled.get()); + } + } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java index 3557c8682ab..a24882743c6 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.Version; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -17,6 +18,7 @@ import org.elasticsearch.test.ESTestCase; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.function.BiFunction; import static org.elasticsearch.xpack.upgrade.IndexUpgradeCheckTests.newTestIndexMeta; import static org.hamcrest.core.IsEqual.equalTo; @@ -24,25 +26,33 @@ import static org.hamcrest.core.IsEqual.equalTo; public class IndexUpgradeServiceTests extends ESTestCase { private IndexUpgradeCheck upgradeBarCheck = new IndexUpgradeCheck("upgrade_bar", Settings.EMPTY, - indexAndParams -> "bar".equals(indexAndParams.v1().getSettings().get("test.setting")), - UpgradeActionRequired.UPGRADE, UpgradeActionRequired.NOT_APPLICABLE); + (BiFunction, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> { + if ("bar".equals(indexMetaData.getSettings().get("test.setting"))) { + return UpgradeActionRequired.UPGRADE; + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + }, null, null, null, null); private IndexUpgradeCheck reindexFooCheck = new IndexUpgradeCheck("reindex_foo", Settings.EMPTY, - indexAndParams -> "foo".equals(indexAndParams.v1().getSettings().get("test.setting")), - UpgradeActionRequired.REINDEX, UpgradeActionRequired.NOT_APPLICABLE); + (BiFunction, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> { + if ("foo".equals(indexMetaData.getSettings().get("test.setting"))) { + return UpgradeActionRequired.REINDEX; + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + }, null, null, null, null); private IndexUpgradeCheck everythingIsFineCheck = new IndexUpgradeCheck("everything_is_fine", Settings.EMPTY, - indexAndParams -> true, - UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE); + (indexMetaData, stringStringMap) -> UpgradeActionRequired.UP_TO_DATE, null, null, null, null); private IndexUpgradeCheck unreachableCheck = new IndexUpgradeCheck("unreachable", Settings.EMPTY, - indexAndParams -> { + (BiFunction, UpgradeActionRequired>) (indexMetaData, stringStringMap) -> { fail("Unreachable check is called"); - return false; - }, UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE); + return null; + }, null, null, null, null); - - public void testIndexUpgradeServiceMultipleCheck() { + public void testIndexUpgradeServiceMultipleCheck() throws Exception { IndexUpgradeService service; if (randomBoolean()) { service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList( @@ -80,7 +90,7 @@ public class IndexUpgradeServiceTests extends ESTestCase { } - public void testNoMatchingChecks() { + public void testNoMatchingChecks() throws Exception { IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList( upgradeBarCheck, reindexFooCheck @@ -100,7 +110,7 @@ public class IndexUpgradeServiceTests extends ESTestCase { assertThat(result.get("foo"), equalTo(UpgradeActionRequired.REINDEX)); } - public void testEarlierChecksWin() { + public void testEarlierChecksWin() throws Exception { IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList( everythingIsFineCheck, upgradeBarCheck, @@ -117,9 +127,29 @@ public class IndexUpgradeServiceTests extends ESTestCase { IndicesOptions.lenientExpandOpen(), Collections.emptyMap(), clusterState); assertThat(result.size(), equalTo(0)); // everything as the first checker should indicate that everything is fine + } + + public void testGenericTest() throws Exception { + IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList( + upgradeBarCheck, + reindexFooCheck + )); + + IndexMetaData goodIndex = newTestIndexMeta("good", Settings.EMPTY); + IndexMetaData badIndex = newTestIndexMeta("bad", + Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.fromString("2.0.0")).build()); + + ClusterState clusterState = mockClusterState(goodIndex, badIndex); + + Map result = service.upgradeInfo(new String[]{"good", "bad"}, + IndicesOptions.lenientExpandOpen(), Collections.emptyMap(), clusterState); + + assertThat(result.size(), equalTo(1)); + assertThat(result.get("bad"), equalTo(UpgradeActionRequired.REINDEX)); } + private ClusterState mockClusterState(IndexMetaData... indices) { MetaData.Builder metaDataBuilder = MetaData.builder(); for (IndexMetaData indexMetaData : indices) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java index 1b46bc30344..6276deff09b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.XPackPlugin; import java.util.ArrayList; @@ -180,8 +181,10 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { } private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) { - return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()), - version, transformScript, types); + return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()), + version, transformScript, types, voidActionListener -> voidActionListener.onResponse(null), + (aVoid, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); + } private ClusterState clusterState() {