diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 871588a0103..481d98bd32b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -101,7 +101,6 @@ import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken; import org.elasticsearch.xpack.ssl.SSLConfigurationReloader; import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.upgrade.Upgrade; -import org.elasticsearch.xpack.upgrade.InternalIndexUpgradeCheck; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.WatcherFeatureSet; @@ -223,7 +222,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I this.machineLearning = new MachineLearning(settings, env, licenseState); this.logstash = new Logstash(settings); this.deprecation = new Deprecation(); - this.upgrade = new Upgrade(settings, Collections.singletonList(new InternalIndexUpgradeCheck())); + this.upgrade = new Upgrade(settings); // Check if the node is a transport client. if (transportClientMode == false) { this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions()); @@ -270,7 +269,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I components.add(internalClient); LicenseService licenseService = new LicenseService(settings, clusterService, getClock(), - env, resourceWatcherService, licenseState); + env, resourceWatcherService, licenseState); components.add(licenseService); components.add(licenseState); @@ -576,4 +575,5 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I public List getBootstrapChecks() { return security.getBootstrapChecks(); } + } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/GenericIndexUpgradeCheck.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/GenericIndexUpgradeCheck.java deleted file mode 100644 index cc5d4012472..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/GenericIndexUpgradeCheck.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.upgrade; - -import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; - -import java.util.Map; - -/** - * Generic upgrade check applicable to all indices to be upgraded from the current version - * to the next major version - */ -public class GenericIndexUpgradeCheck implements IndexUpgradeCheck { - @Override - public String getName() { - return "generic"; - } - - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - if (indexMetaData.getCreationVersion().before(Version.V_5_0_0_alpha1)) { - return UpgradeActionRequired.REINDEX; - } - return UpgradeActionRequired.UP_TO_DATE; - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java index 40b28b162da..967ebfb66cf 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java @@ -5,22 +5,135 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.script.Script; -import java.util.Collection; -import java.util.Collections; import java.util.Map; +import java.util.function.Predicate; /** - * Interface that for an index check and upgrade process + * Generic upgrade check applicable to all indices to be upgraded from the current version + * to the next major version + *

+ * The upgrade is performed in the following way: + *

+ * - preUpgrade method is called + * - reindex is performed + * - postUpgrade is called if reindex was successful */ -public interface IndexUpgradeCheck { - String getName(); +public class IndexUpgradeCheck extends AbstractComponent { + public static final int UPRADE_VERSION = 6; - UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state); + private final String name; + private final Predicate>> isSupported; + private final InternalIndexReindexer reindexer; + private final UpgradeActionRequired matchAction; + private final UpgradeActionRequired noMatchAction; - default Collection supportedParams() { - return Collections.emptyList(); + /** + * Creates a new upgrade check that doesn't support upgrade + * + * @param name - the name of the check + * @param settings - system settings + * @param isSupported - return true if they can work with the index with specified name + * @param matchAction - action if isSupported return true + * @param noMatchAction - action if isSupported return false + */ + public IndexUpgradeCheck(String name, Settings settings, + Predicate>> isSupported, + UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) { + super(settings); + this.name = name; + this.isSupported = isSupported; + this.reindexer = null; + this.matchAction = matchAction; + this.noMatchAction = noMatchAction; } + + /** + * Creates a new upgrade check + * + * @param name - the name of the check + * @param settings - system settings + * @param client - client + * @param clusterService - cluster service + * @param isSupported - return true if they can work with the index with specified name + * @param types - a list of types that the reindexing should be limited to + * @param updateScript - the upgrade script that should be used during reindexing + */ + public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService, + Predicate>> isSupported, + String[] types, Script updateScript) { + this(name, settings, client, clusterService, isSupported, types, updateScript, UpgradeActionRequired.UPGRADE, + UpgradeActionRequired.NOT_APPLICABLE); + } + + /** + * Creates a new upgrade check + * + * @param name - the name of the check + * @param settings - system settings + * @param client - client + * @param clusterService - cluster service + * @param isSupported - return true if they can work with the index with specified name + * @param types - a list of types that the reindexing should be limited to + * @param updateScript - the upgrade script that should be used during reindexing + */ + public IndexUpgradeCheck(String name, Settings settings, Client client, ClusterService clusterService, + Predicate>> isSupported, + String[] types, Script updateScript, UpgradeActionRequired matchAction, UpgradeActionRequired noMatchAction) { + super(settings); + this.name = name; + this.isSupported = isSupported; + this.reindexer = new InternalIndexReindexer(client, clusterService, UPRADE_VERSION, updateScript, types); + this.matchAction = matchAction; + this.noMatchAction = noMatchAction; + } + /** + * Returns the name of the check + */ + public String getName() { + return name; + } + + /** + * This method is called by Upgrade API to verify if upgrade or reindex for this index is required + * + * @param indexMetaData index metadata + * @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams} + * @param state current cluster state + * @return required action or UpgradeActionRequired.NOT_APPLICABLE if this check cannot be performed on the index + */ + public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { + if (isSupported.test(new Tuple<>(indexMetaData, params))) { + return matchAction; + } + return noMatchAction; + } + + /** + * Perform the index upgrade + * + * @param indexMetaData index metadata + * @param params additional user-specified parameters see {@link IndexUpgradeCheckFactory#supportedParams} + * @param state current cluster state + * @param listener the listener that should be called upon completion of the upgrade + */ + public void upgrade(IndexMetaData indexMetaData, Map params, ClusterState state, + ActionListener listener) { + if (reindexer == null) { + throw new UnsupportedOperationException(getName() + " check doesn't support index upgrade"); + } else { + reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener); + } + } + } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckFactory.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckFactory.java new file mode 100644 index 00000000000..775e205f255 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade; + +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.xpack.security.InternalClient; + +import java.util.Collection; +import java.util.Collections; + +/** + * Factory for index checks + */ +public interface IndexUpgradeCheckFactory { + + /** + * Using this method the check can expose additional user parameter that can be specified by the user on upgrade api + * + * @return the list of supported parameters + */ + default Collection supportedParams() { + return Collections.emptyList(); + } + + /** + * Creates an upgrade check + *

+ * This method is called from {@link org.elasticsearch.plugins.Plugin#createComponents} method. + */ + IndexUpgradeCheck createCheck(InternalClient internalClient, ClusterService clusterService); + +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java index 323a7357c90..dfb1f9b0e0f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -12,12 +13,15 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.reindex.BulkByScrollResponse; import java.util.HashMap; import java.util.List; import java.util.Map; public class IndexUpgradeService extends AbstractComponent { + public static final IndicesOptions UPGRADE_INDEX_OPTIONS = IndicesOptions.strictSingleIndexNoExpandForbidClosed(); private final List upgradeChecks; @@ -71,4 +75,33 @@ public class IndexUpgradeService extends AbstractComponent { return results; } + public void upgrade(String index, Map params, ClusterState state, + ActionListener listener) { + IndexMetaData indexMetaData = state.metaData().index(index); + if (indexMetaData == null) { + throw new IndexNotFoundException(index); + } + for (IndexUpgradeCheck check : upgradeChecks) { + UpgradeActionRequired upgradeActionRequired = check.actionRequired(indexMetaData, params, state); + switch (upgradeActionRequired) { + case UPGRADE: + // this index needs to be upgraded - start the upgrade procedure + check.upgrade(indexMetaData, params, state, listener); + return; + case REINDEX: + // this index needs to be re-indexed + throw new IllegalStateException("Index [" + index + "] cannot be upgraded, it should be reindex instead"); + case UP_TO_DATE: + throw new IllegalStateException("Index [" + index + "] cannot be upgraded, it is up to date"); + case NOT_APPLICABLE: + // this action is not applicable to this index - skipping to the next one + break; + default: + throw new IllegalStateException("unknown upgrade action [" + upgradeActionRequired + "] for the index [" + index + "]"); + + } + } + throw new IllegalStateException("Index [" + index + "] cannot be upgraded"); + } + } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java new file mode 100644 index 00000000000..3965e566528 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade; + +import com.carrotsearch.hppc.procedures.ObjectProcedure; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.IndexNotFoundException; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.script.Script; +import org.elasticsearch.transport.TransportResponse; + +/** + * A component that performs the following upgrade procedure: + *

+ * - Check that all data and master nodes are running running the same version + * - Create a new index .{name}-v6 + * - Make index .{name} read only + * - Reindex from .{name} to .{name}-v6 with transform + * - Delete index .{name} and add alias .{name} to .{name}-v6 + */ +public class InternalIndexReindexer { + + private final Client client; + private final ClusterService clusterService; + private final Script transformScript; + private final String[] types; + private final int version; + + public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types) { + this.client = client; + this.clusterService = clusterService; + this.transformScript = transformScript; + this.types = types; + this.version = version; + } + + public void upgrade(String index, ClusterState clusterState, ActionListener listener) { + String newIndex = index + "_v" + version; + try { + checkMasterAndDataNodeVersion(clusterState); + client.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> + setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse -> + reindex(index, newIndex, ActionListener.wrap( + bulkByScrollResponse -> // Successful completion of reindexing - delete old index + removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse -> + client.admin().indices().prepareAliases().removeIndex(index) + .addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse -> + listener.onResponse(bulkByScrollResponse), listener::onFailure + )), listener::onFailure + )), + e -> // Something went wrong during reindexing - remove readonly flag and report the error + removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse -> { + listener.onFailure(e); + }, e1 -> { + listener.onFailure(e); + })) + )), listener::onFailure + )), listener::onFailure + )); + } catch (Exception ex) { + listener.onFailure(ex); + } + } + + private void checkMasterAndDataNodeVersion(ClusterState clusterState) { + if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) { + throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade"); + } + } + + private void removeReadOnlyBlock(String index, ActionListener listener) { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); + client.admin().indices().prepareUpdateSettings(index).setSettings(settings).execute(listener); + } + + private void reindex(String index, String newIndex, ActionListener listener) { + SearchRequest sourceRequest = new SearchRequest(index); + sourceRequest.types(types); + IndexRequest destinationRequest = new IndexRequest(newIndex); + ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest); + reindexRequest.setRefresh(true); + reindexRequest.setScript(transformScript); + client.execute(ReindexAction.INSTANCE, reindexRequest, listener); + } + + /** + * Makes the index readonly if it's not set as a readonly yet + */ + private void setReadOnlyBlock(String index, ActionListener listener) { + clusterService.submitStateUpdateTask("lock-index-for-upgrade", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + final IndexMetaData indexMetaData = currentState.metaData().index(index); + if (indexMetaData == null) { + throw new IndexNotFoundException(index); + } + + if (indexMetaData.getState() != IndexMetaData.State.OPEN) { + throw new IllegalStateException("unable to upgrade a closed index[" + index + "]"); + } + if (currentState.blocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_BLOCK)) { + throw new IllegalStateException("unable to upgrade a read-only index[" + index + "]"); + } + + Settings.Builder indexSettings = Settings.builder().put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true); + + MetaData.Builder metaDataBuilder = MetaData.builder(currentState.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(indexSettings)); + + ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()) + .addIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_BLOCK); + + return ClusterState.builder(currentState).metaData(metaDataBuilder).blocks(blocks).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(TransportResponse.Empty.INSTANCE); + } + }); + } + +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexUpgradeCheck.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexUpgradeCheck.java deleted file mode 100644 index 7913b08a867..00000000000 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexUpgradeCheck.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.upgrade; - -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.regex.Regex; -import org.elasticsearch.tasks.TaskResultsService; -import org.elasticsearch.xpack.security.SecurityLifecycleService; -import org.elasticsearch.xpack.watcher.watch.Watch; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * Generic upgrade check applicable to all indices to be upgraded from the current version - * to the next major version - */ -public class InternalIndexUpgradeCheck implements IndexUpgradeCheck { - private final Set KNOWN_INTERNAL_INDICES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList( - Watch.INDEX, - SecurityLifecycleService.SECURITY_INDEX_NAME, - TaskResultsService.TASK_INDEX - ))); - - - @Override - public String getName() { - return "inner"; - } - - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - String indexName = indexMetaData.getIndex().getName(); - if (KNOWN_INTERNAL_INDICES.contains(indexName)) { - return UpgradeActionRequired.UPGRADE; - } - if (isKibanaIndex(params.getOrDefault("kibana_indices", ".kibana"), indexName)) { - return UpgradeActionRequired.UPGRADE; - } - return UpgradeActionRequired.NOT_APPLICABLE; - } - - private boolean isKibanaIndex(String kibanaIndicesMasks, String indexName) { - String[] kibanaIndices = Strings.delimitedListToStringArray(kibanaIndicesMasks, ","); - return Regex.simpleMatch(kibanaIndices, indexName); - } - - @Override - public Collection supportedParams() { - return Collections.singletonList("kibana_indices"); - } -} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java index 8ab967c1493..db6dfbe0fce 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/Upgrade.java @@ -5,67 +5,81 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestHandler; +import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.security.InternalClient; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; +import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeAction; import org.elasticsearch.xpack.upgrade.rest.RestIndexUpgradeInfoAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.BiFunction; import java.util.function.Supplier; public class Upgrade implements ActionPlugin { - private Settings settings; - private List customUpgradeChecks; - private Set extraParameters; + public static final Version UPGRADE_INTRODUCED = Version.V_5_5_0_UNRELEASED; // TODO: Probably will need to change this to 5.6.0 - public Upgrade(Settings settings, List customUpgradeChecks) { + private final Settings settings; + private final List> upgradeCheckFactories; + private final Set extraParameters; + + public Upgrade(Settings settings) { this.settings = settings; - this.customUpgradeChecks = customUpgradeChecks; this.extraParameters = new HashSet<>(); - for (IndexUpgradeCheck check : customUpgradeChecks) { - extraParameters.addAll(check.supportedParams()); + this.upgradeCheckFactories = new ArrayList<>(); + for (Tuple, BiFunction> checkFactory : Arrays.asList( + getKibanaUpgradeCheckFactory(settings), + getGenericCheckFactory(settings))) { + extraParameters.addAll(checkFactory.v1()); + upgradeCheckFactories.add(checkFactory.v2()); } } public Collection createComponents(InternalClient internalClient, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry) { - - List upgradeChecks = new ArrayList<>(customUpgradeChecks); - - // The generic test goes to the end of the list - upgradeChecks.add(new GenericIndexUpgradeCheck()); - - return Collections.singletonList(new IndexUpgradeService(settings, upgradeChecks)); + List upgradeChecks = new ArrayList<>(upgradeCheckFactories.size()); + for (BiFunction checkFactory : upgradeCheckFactories) { + upgradeChecks.add(checkFactory.apply(internalClient, clusterService)); + } + return Collections.singletonList(new IndexUpgradeService(settings, Collections.unmodifiableList(upgradeChecks))); } @Override public List> getActions() { - return Collections.singletonList( - new ActionHandler<>(IndexUpgradeInfoAction.INSTANCE, IndexUpgradeInfoAction.TransportAction.class)); + return Arrays.asList( + new ActionHandler<>(IndexUpgradeInfoAction.INSTANCE, IndexUpgradeInfoAction.TransportAction.class), + new ActionHandler<>(IndexUpgradeAction.INSTANCE, IndexUpgradeAction.TransportAction.class) + ); } @Override @@ -73,8 +87,43 @@ public class Upgrade implements ActionPlugin { IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier nodesInCluster) { - return Collections.singletonList( - new RestIndexUpgradeInfoAction(settings, restController, extraParameters) + return Arrays.asList( + new RestIndexUpgradeInfoAction(settings, restController, extraParameters), + new RestIndexUpgradeAction(settings, restController, extraParameters) ); } + + static Tuple, BiFunction> getGenericCheckFactory( + Settings settings) { + return new Tuple<>( + Collections.emptyList(), + (internalClient, clusterService) -> + new IndexUpgradeCheck("generic", settings, + indexAndParams -> indexAndParams.v1().getCreationVersion().before(Version.V_5_0_0_alpha1), + UpgradeActionRequired.REINDEX, + UpgradeActionRequired.UP_TO_DATE)); + } + + static Tuple, BiFunction> getKibanaUpgradeCheckFactory( + Settings settings) { + return new Tuple<>( + Collections.singletonList("kibana_indices"), + (internalClient, clusterService) -> + new IndexUpgradeCheck("kibana", + settings, + internalClient, + clusterService, + indexAndParams -> { + String indexName = indexAndParams.v1().getIndex().getName(); + String kibanaIndicesMasks = indexAndParams.v2().getOrDefault("kibana_indices", ".kibana"); + String[] kibanaIndices = Strings.delimitedListToStringArray(kibanaIndicesMasks, ","); + return Regex.simpleMatch(kibanaIndices, indexName); + }, + Strings.EMPTY_ARRAY, + new Script(ScriptType.INLINE, "painless", "ctx._id = ctx._type + \"-\" + ctx._id;\n" + + "ctx._source = [ ctx._type : ctx._source ];\n" + + "ctx._source.type = ctx._type;\n" + + "ctx._type = \"doc\";", + new HashMap<>()))); + } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java new file mode 100644 index 00000000000..fc2efda6c3a --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java @@ -0,0 +1,202 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade.actions; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBuilder; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.upgrade.IndexUpgradeService; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.xpack.upgrade.IndexUpgradeService.UPGRADE_INDEX_OPTIONS; + +public class IndexUpgradeAction extends Action { + + public static final IndexUpgradeAction INSTANCE = new IndexUpgradeAction(); + public static final String NAME = "cluster:admin/xpack/upgrade"; + + private IndexUpgradeAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public BulkByScrollResponse newResponse() { + return new BulkByScrollResponse(); + } + + public static class Request extends MasterNodeReadRequest implements IndicesRequest { + + private String index = null; + private Map extraParams = Collections.emptyMap(); + + // for serialization + public Request() { + + } + + public Request(String index) { + this.index = index; + } + + public String index() { + return index; + } + + /** + * Sets the index. + */ + @SuppressWarnings("unchecked") + public final Request index(String index) { + this.index = index; + return this; + } + + @Override + public String[] indices() { + return new String[]{index}; + } + + @Override + public IndicesOptions indicesOptions() { + return UPGRADE_INDEX_OPTIONS; + } + + + public Map extraParams() { + return extraParams; + } + + public Request extraParams(Map extraParams) { + this.extraParams = extraParams; + return this; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (index == null) { + validationException = addValidationError("index is missing", validationException); + } + if (extraParams == null) { + validationException = addValidationError("params are missing", validationException); + } + return validationException; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + index = in.readString(); + extraParams = in.readMap(StreamInput::readString, StreamInput::readString); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + out.writeMap(extraParams, StreamOutput::writeString, StreamOutput::writeString); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(index, request.index) && + Objects.equals(extraParams, request.extraParams); + } + + @Override + public int hashCode() { + return Objects.hash(index, extraParams); + } + } + + public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder { + + protected RequestBuilder(ElasticsearchClient client, IndexUpgradeAction action) { + super(client, action, new Request()); + } + + public RequestBuilder setIndex(String index) { + request.index(index); + return this; + } + + public RequestBuilder setExtraParams(Map params) { + request.extraParams(params); + return this; + } + + + } + + public static class TransportAction extends TransportMasterNodeAction { + + private final IndexUpgradeService indexUpgradeService; + + @Inject + public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexUpgradeService indexUpgradeService, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, IndexUpgradeAction.NAME, transportService, clusterService, threadPool, actionFilters, + indexNameExpressionResolver, Request::new); + this.indexUpgradeService = indexUpgradeService; + } + + @Override + protected String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + protected BulkByScrollResponse newResponse() { + return new BulkByScrollResponse(); + } + + @Override + protected ClusterBlockException checkBlock(Request request, ClusterState state) { + // Cluster is not affected but we look up repositories in metadata + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } + + @Override + protected final void masterOperation(final Request request, ClusterState state, ActionListener listener) { + indexUpgradeService.upgrade(request.index(), request.extraParams(), state, listener); + } + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/rest/RestIndexUpgradeAction.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/rest/RestIndexUpgradeAction.java new file mode 100644 index 00000000000..0706d3919b5 --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/rest/RestIndexUpgradeAction.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade.rest; + +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.BulkByScrollTask; +import org.elasticsearch.index.reindex.ScrollableHitSource; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestBuilderListener; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction.Request; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class RestIndexUpgradeAction extends BaseRestHandler { + private final Set extraParameters; + + public RestIndexUpgradeAction(Settings settings, RestController controller, Set extraParameters) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, "{index}/_xpack/_upgrade", this); + this.extraParameters = extraParameters; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + if (request.method().equals(RestRequest.Method.POST)) { + return handlePost(request, client); + } else { + throw new IllegalArgumentException("illegal method [" + request.method() + "] for request [" + request.path() + "]"); + } + } + + private RestChannelConsumer handlePost(final RestRequest request, NodeClient client) { + Request upgradeRequest = new Request(request.param("index")); + Map extraParamsMap = new HashMap<>(); + for (String param : extraParameters) { + String value = request.param(param); + if (value != null) { + extraParamsMap.put(param, value); + } + } + upgradeRequest.extraParams(extraParamsMap); + Map params = new HashMap<>(); + params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(true)); + params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(true)); + + return channel -> client.execute(IndexUpgradeAction.INSTANCE, upgradeRequest, + new RestBuilderListener(channel) { + + @Override + public RestResponse buildResponse(BulkByScrollResponse response, XContentBuilder builder) throws Exception { + builder.startObject(); + response.toXContent(builder, new ToXContent.DelegatingMapParams(params, channel.request())); + builder.endObject(); + return new BytesRestResponse(getStatus(response), builder); + } + + private RestStatus getStatus(BulkByScrollResponse response) { + /* + * Return the highest numbered rest status under the assumption that higher numbered statuses are "more error" + * and thus more interesting to the user. + */ + RestStatus status = RestStatus.OK; + if (response.isTimedOut()) { + status = RestStatus.REQUEST_TIMEOUT; + } + for (BulkItemResponse.Failure failure : response.getBulkFailures()) { + if (failure.getStatus().getStatus() > status.getStatus()) { + status = failure.getStatus(); + } + } + for (ScrollableHitSource.SearchFailure failure : response.getSearchFailures()) { + RestStatus failureStatus = ExceptionsHelper.status(failure.getReason()); + if (failureStatus.getStatus() > status.getStatus()) { + status = failureStatus; + } + } + return status; + } + + }); + } +} + diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java index b92d8722d4e..a3b882646ab 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheckTests.java @@ -19,7 +19,7 @@ import static org.hamcrest.core.IsEqual.equalTo; public class IndexUpgradeCheckTests extends ESTestCase { public void testGenericUpgradeCheck() { - IndexUpgradeCheck check = new GenericIndexUpgradeCheck(); + IndexUpgradeCheck check = Upgrade.getGenericCheckFactory(Settings.EMPTY).v2().apply(null, null); assertThat(check.getName(), equalTo("generic")); IndexMetaData goodIndex = newTestIndexMeta("good", Settings.EMPTY); IndexMetaData badIndex = newTestIndexMeta("bad", @@ -31,9 +31,9 @@ public class IndexUpgradeCheckTests extends ESTestCase { equalTo(UpgradeActionRequired.REINDEX)); } - public void testInternalUpgradeCheck() { - IndexUpgradeCheck check = new InternalIndexUpgradeCheck(); - assertThat(check.getName(), equalTo("inner")); + public void testKibanaUpgradeCheck() { + IndexUpgradeCheck check = Upgrade.getKibanaUpgradeCheckFactory(Settings.EMPTY).v2().apply(null, null); + assertThat(check.getName(), equalTo("kibana")); IndexMetaData goodKibanaIndex = newTestIndexMeta(".kibana", Settings.EMPTY); assertThat(check.actionRequired(goodKibanaIndex, Collections.emptyMap(), ClusterState.EMPTY_STATE), equalTo(UpgradeActionRequired.UPGRADE)); @@ -50,11 +50,11 @@ public class IndexUpgradeCheckTests extends ESTestCase { IndexMetaData watcherIndex = newTestIndexMeta(".watches", Settings.EMPTY); assertThat(check.actionRequired(watcherIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE), - equalTo(UpgradeActionRequired.UPGRADE)); + equalTo(UpgradeActionRequired.NOT_APPLICABLE)); IndexMetaData securityIndex = newTestIndexMeta(".security", Settings.EMPTY); assertThat(check.actionRequired(securityIndex, Collections.singletonMap("kibana_indices", ".kibana*"), ClusterState.EMPTY_STATE), - equalTo(UpgradeActionRequired.UPGRADE)); + equalTo(UpgradeActionRequired.NOT_APPLICABLE)); } public static IndexMetaData newTestIndexMeta(String name, Settings indexSettings) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java index 986d94fe0ae..05170d94cae 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java @@ -6,21 +6,15 @@ package org.elasticsearch.xpack.upgrade; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.license.AbstractLicensesIntegrationTestCase; -import org.elasticsearch.license.License; -import org.elasticsearch.license.TestUtils; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.xpack.XPackPlugin; -import org.elasticsearch.xpack.XPackSettings; -import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction.Response; import org.junit.Before; -import java.util.Collection; import java.util.Collections; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -28,53 +22,13 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.core.IsEqual.equalTo; -public class IndexUpgradeIT extends AbstractLicensesIntegrationTestCase { +public class IndexUpgradeIT extends IndexUpgradeIntegTestCase { @Before public void resetLicensing() throws Exception { enableLicensing(); } - @Override - protected boolean ignoreExternalCluster() { - return true; - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); - settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); - settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); - settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); - settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); - settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); - return settings.build(); - } - - @Override - protected Settings transportClientSettings() { - Settings.Builder settings = Settings.builder().put(super.transportClientSettings()); - settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); - settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); - settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); - settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); - settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); - settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); - return settings.build(); - } - - @Override - protected Collection> nodePlugins() { - return Collections.singleton(XPackPlugin.class); - } - - @Override - protected Collection> transportClientPlugins() { - return nodePlugins(); - } - - public void testIndexUpgradeInfo() { assertAcked(client().admin().indices().prepareCreate("test").get()); assertAcked(client().admin().indices().prepareCreate("kibana_test").get()); @@ -99,29 +53,21 @@ public class IndexUpgradeIT extends AbstractLicensesIntegrationTestCase { assertThat(response.getActions().entrySet(), empty()); } - private static String randomValidLicenseType() { - return randomFrom("platinum", "gold", "standard", "basic"); + public void testUpgradeInternalIndex() throws Exception { + String testIndex = ".kibana"; + String testType = "doc"; + assertAcked(client().admin().indices().prepareCreate(testIndex).get()); + indexRandom(true, + client().prepareIndex(testIndex, testType, "1").setSource("{\"foo\":\"bar\"}", XContentType.JSON), + client().prepareIndex(testIndex, testType, "2").setSource("{\"foo\":\"baz\"}", XContentType.JSON) + ); + ensureYellow(testIndex); + + BulkByScrollResponse response = client().prepareExecute(IndexUpgradeAction.INSTANCE).setIndex(testIndex).get(); + assertThat(response.getCreated(), equalTo(2L)); + + SearchResponse searchResponse = client().prepareSearch(testIndex).get(); + assertEquals(2L, searchResponse.getHits().getTotalHits()); } - private static String randomInvalidLicenseType() { - return randomFrom("missing", "trial"); - } - - public void disableLicensing() throws Exception { - updateLicensing(randomInvalidLicenseType()); - } - - public void enableLicensing() throws Exception { - updateLicensing(randomValidLicenseType()); - } - - public void updateLicensing(String licenseType) throws Exception { - wipeAllLicenses(); - if (licenseType.equals("missing")) { - putLicenseTombstone(); - } else { - License license = TestUtils.generateSignedLicense(licenseType, TimeValue.timeValueMinutes(1)); - putLicense(license); - } - } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIntegTestCase.java new file mode 100644 index 00000000000..fa9c2739cd2 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIntegTestCase.java @@ -0,0 +1,87 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade; + +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.license.AbstractLicensesIntegrationTestCase; +import org.elasticsearch.license.License; +import org.elasticsearch.license.TestUtils; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.xpack.XPackPlugin; +import org.elasticsearch.xpack.XPackSettings; +import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.monitoring.test.MockPainlessScriptEngine; + +import java.util.Arrays; +import java.util.Collection; + +public abstract class IndexUpgradeIntegTestCase extends AbstractLicensesIntegrationTestCase { + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Settings.Builder settings = Settings.builder().put(super.nodeSettings(nodeOrdinal)); + settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + return settings.build(); + } + + @Override + protected Settings transportClientSettings() { + Settings.Builder settings = Settings.builder().put(super.transportClientSettings()); + settings.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false); + settings.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false); + settings.put(XPackSettings.SECURITY_ENABLED.getKey(), false); + settings.put(XPackSettings.WATCHER_ENABLED.getKey(), false); + settings.put(XPackSettings.MONITORING_ENABLED.getKey(), false); + settings.put(XPackSettings.GRAPH_ENABLED.getKey(), false); + return settings.build(); + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, ReindexPlugin.class, MockPainlessScriptEngine.TestPlugin.class); + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + private static String randomValidLicenseType() { + return randomFrom("platinum", "gold", "standard", "basic"); + } + + private static String randomInvalidLicenseType() { + return randomFrom("missing", "trial"); + } + + public void disableLicensing() throws Exception { + updateLicensing(randomInvalidLicenseType()); + } + + public void enableLicensing() throws Exception { + updateLicensing(randomValidLicenseType()); + } + + public void updateLicensing(String licenseType) throws Exception { + wipeAllLicenses(); + if (licenseType.equals("missing")) { + putLicenseTombstone(); + } else { + License license = TestUtils.generateSignedLicense(licenseType, TimeValue.timeValueMinutes(1)); + putLicense(license); + } + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java index 44a19c1a0cd..3557c8682ab 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeServiceTests.java @@ -23,61 +23,24 @@ import static org.hamcrest.core.IsEqual.equalTo; public class IndexUpgradeServiceTests extends ESTestCase { - private IndexUpgradeCheck upgradeBarCheck = new IndexUpgradeCheck() { - @Override - public String getName() { - return "upgrade_bar"; - } + private IndexUpgradeCheck upgradeBarCheck = new IndexUpgradeCheck("upgrade_bar", Settings.EMPTY, + indexAndParams -> "bar".equals(indexAndParams.v1().getSettings().get("test.setting")), + UpgradeActionRequired.UPGRADE, UpgradeActionRequired.NOT_APPLICABLE); - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - if ("bar".equals(indexMetaData.getSettings().get("test.setting"))) { - return UpgradeActionRequired.UPGRADE; - } - return UpgradeActionRequired.NOT_APPLICABLE; - } - }; + private IndexUpgradeCheck reindexFooCheck = new IndexUpgradeCheck("reindex_foo", Settings.EMPTY, + indexAndParams -> "foo".equals(indexAndParams.v1().getSettings().get("test.setting")), + UpgradeActionRequired.REINDEX, UpgradeActionRequired.NOT_APPLICABLE); - private IndexUpgradeCheck reindexFooCheck = new IndexUpgradeCheck() { - @Override - public String getName() { - return "reindex_foo"; - } + private IndexUpgradeCheck everythingIsFineCheck = new IndexUpgradeCheck("everything_is_fine", Settings.EMPTY, + indexAndParams -> true, + UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE); - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - if ("foo".equals(indexMetaData.getSettings().get("test.setting"))) { - return UpgradeActionRequired.REINDEX; - } - return UpgradeActionRequired.NOT_APPLICABLE; - } - }; + private IndexUpgradeCheck unreachableCheck = new IndexUpgradeCheck("unreachable", Settings.EMPTY, + indexAndParams -> { + fail("Unreachable check is called"); + return false; + }, UpgradeActionRequired.UP_TO_DATE, UpgradeActionRequired.NOT_APPLICABLE); - private IndexUpgradeCheck everythingIsFineCheck = new IndexUpgradeCheck() { - @Override - public String getName() { - return "everything_is_fine"; - } - - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - return UpgradeActionRequired.UP_TO_DATE; - } - }; - - private IndexUpgradeCheck unreachableCheck = new IndexUpgradeCheck() { - - @Override - public String getName() { - return "unreachable"; - } - - @Override - public UpgradeActionRequired actionRequired(IndexMetaData indexMetaData, Map params, ClusterState state) { - fail("Unreachable check is called"); - return null; - } - }; public void testIndexUpgradeServiceMultipleCheck() { IndexUpgradeService service; @@ -138,7 +101,6 @@ public class IndexUpgradeServiceTests extends ESTestCase { } public void testEarlierChecksWin() { - IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Arrays.asList( everythingIsFineCheck, upgradeBarCheck, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java new file mode 100644 index 00000000000..1b46bc30344 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java @@ -0,0 +1,208 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade; + +import com.carrotsearch.hppc.cursors.ObjectCursor; +import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; +import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.AliasMetaData; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.indices.InvalidIndexNameException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.MockScriptPlugin; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.xpack.XPackPlugin; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; +import static org.hamcrest.core.IsEqual.equalTo; + +public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(XPackPlugin.class, ReindexPlugin.class, CustomScriptPlugin.class); + } + + public static class CustomScriptPlugin extends MockScriptPlugin { + @Override + protected Map, Object>> pluginScripts() { + Map, Object>> scripts = new HashMap<>(); + scripts.put("add_bar", map -> { + @SuppressWarnings("unchecked") Map ctx = (Map) map.get("ctx"); + ctx.put("_id", "bar" + "-" + ctx.get("_id")); + @SuppressWarnings("unchecked") Map source = (Map) ctx.get("_source"); + source.put("bar", true); + return null; + }); + scripts.put("fail", map -> { + throw new RuntimeException("Stop reindexing"); + }); + return scripts; + } + } + + public void testUpgradeIndex() throws Exception { + createTestIndex("test"); + InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", clusterState(), future); + BulkByScrollResponse response = future.actionGet(); + assertThat(response.getCreated(), equalTo(2L)); + + SearchResponse searchResponse = client().prepareSearch("test_v123").get(); + assertThat(searchResponse.getHits().getTotalHits(), equalTo(2L)); + assertThat(searchResponse.getHits().getHits().length, equalTo(2)); + for (SearchHit hit : searchResponse.getHits().getHits()) { + assertThat(hit.getId(), startsWith("bar-")); + assertThat(hit.getSourceAsMap(), notNullValue()); + assertThat(hit.getSourceAsMap().get("bar"), equalTo(true)); + } + + GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get(); + assertThat(aliasesResponse.getAliases().size(), equalTo(1)); + List testAlias = aliasesResponse.getAliases().get("test_v123"); + assertNotNull(testAlias); + assertThat(testAlias.size(), equalTo(1)); + assertThat(testAlias.get(0).alias(), equalTo("test")); + } + + public void testTargetIndexExists() throws Exception { + createTestIndex("test"); + createTestIndex("test_v123"); + InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", clusterState(), future); + assertThrows(future, ResourceAlreadyExistsException.class); + + // Make sure that the index is not marked as read-only + client().prepareIndex("test", "doc").setSource("foo", "bar").get(); + } + + public void testTargetIndexExistsAsAlias() throws Exception { + createTestIndex("test"); + createTestIndex("test-foo"); + client().admin().indices().prepareAliases().addAlias("test-foo", "test_v123").get(); + InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", clusterState(), future); + assertThrows(future, InvalidIndexNameException.class); + + // Make sure that the index is not marked as read-only + client().prepareIndex("test_v123", "doc").setSource("foo", "bar").get(); + } + + public void testSourceIndexIsReadonly() throws Exception { + createTestIndex("test"); + try { + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); + InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", clusterState(), future); + assertThrows(future, IllegalStateException.class); + + // Make sure that the index is still marked as read-only + assertThrows(client().prepareIndex("test", "doc").setSource("foo", "bar"), ClusterBlockException.class); + } finally { + // Clean up the readonly index + Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); + assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); + } + } + + + public void testReindexingFailure() throws Exception { + createTestIndex("test"); + // Make sure that the index is not marked as read-only + client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", clusterState(), future); + assertThrows(future, RuntimeException.class); + + // Make sure that the index is not marked as read-only + client().prepareIndex("test", "doc").setSource("foo", "bar").get(); + } + + public void testMixedNodeVersion() throws Exception { + createTestIndex("test"); + + InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade("test", withRandomOldNode(), future); + assertThrows(future, IllegalStateException.class); + + // Make sure that the index is not marked as read-only + client().prepareIndex("test_v123", "doc").setSource("foo", "bar").get(); + } + + private void createTestIndex(String indexName) throws Exception { + assertAcked(client().admin().indices().prepareCreate(indexName).get()); + indexRandom(true, + client().prepareIndex(indexName, "doc", "1").setSource("{\"foo\":\"bar1-1\"}", XContentType.JSON), + client().prepareIndex(indexName, "doc", "2").setSource("{\"foo\":\"baz1-1\"}", XContentType.JSON) + ); + ensureYellow(indexName); + } + + private Script script(String name) { + return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>()); + } + + private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) { + return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()), + version, transformScript, types); + } + + private ClusterState clusterState() { + return clusterService().state(); + } + + private ClusterState withRandomOldNode() { + ClusterState clusterState = clusterState(); + DiscoveryNodes discoveryNodes = clusterState.nodes(); + List nodes = new ArrayList<>(); + for (ObjectCursor key : discoveryNodes.getMasterAndDataNodes().keys()) { + nodes.add(key.value); + } + // Fake one of the node versions + String nodeId = randomFrom(nodes); + DiscoveryNode node = discoveryNodes.get(nodeId); + DiscoveryNode newNode = new DiscoveryNode(node.getName(), node.getId(), node.getEphemeralId(), node.getHostName(), + node.getHostAddress(), node.getAddress(), node.getAttributes(), node.getRoles(), + randomVersionBetween(random(), Version.V_5_0_0, Version.V_5_4_0)); + + return ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(discoveryNodes).remove(node).add(newNode)).build(); + + } +} \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeActionRequestTests.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeActionRequestTests.java new file mode 100644 index 00000000000..92160ac42f0 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeActionRequestTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade.actions; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction.Request; + +import java.util.Collections; + +public class IndexUpgradeActionRequestTests extends AbstractStreamableTestCase { + @Override + protected Request createTestInstance() { + Request request = new Request(randomAlphaOfLength(10)); + if (randomBoolean()) { + request.extraParams(Collections.singletonMap(randomAlphaOfLength(10), randomAlphaOfLength(20))); + } + return request; + } + + @Override + protected Request createBlankInstance() { + return new Request(); + } +} diff --git a/plugin/src/test/resources/org/elasticsearch/transport/actions b/plugin/src/test/resources/org/elasticsearch/transport/actions index b346719b58d..a249e61e1c4 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/actions +++ b/plugin/src/test/resources/org/elasticsearch/transport/actions @@ -151,9 +151,10 @@ cluster:admin/persistent/completion cluster:admin/persistent/update_status cluster:admin/persistent/remove cluster:internal/xpack/ml/job/finalize_job_execution +cluster:admin/xpack/upgrade/info +cluster:admin/xpack/upgrade cluster:admin/reindex/rethrottle indices:data/write/update/byquery indices:data/write/delete/byquery indices:data/write/reindex cluster:admin/xpack/deprecation/info -cluster:admin/xpack/upgrade/info diff --git a/plugin/src/test/resources/org/elasticsearch/transport/handlers b/plugin/src/test/resources/org/elasticsearch/transport/handlers index 287c1d07972..916a48af102 100644 --- a/plugin/src/test/resources/org/elasticsearch/transport/handlers +++ b/plugin/src/test/resources/org/elasticsearch/transport/handlers @@ -130,6 +130,7 @@ internal:indices/flush/synced/sync internal:admin/repository/verify internal:transport/handshake cluster:admin/reindex/rethrottle[n] +cluster:admin/xpack/upgrade indices:data/write/update/byquery indices:data/write/delete/byquery indices:data/write/reindex diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.info.json b/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.info.json index a21df0d5352..214f35c6d35 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.info.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.info.json @@ -28,10 +28,6 @@ "type" : "boolean", "description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)" }, - "wait_for_completion": { - "type" : "boolean", - "description" : "Specify whether the request should block until the all segments are upgraded (default: false)" - }, "kibana_indices": { "type": "list", "description": "A comma separated list of indices that should be treated as kibana indices" diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.json b/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.json new file mode 100644 index 00000000000..f703fa7774e --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.upgrade.json @@ -0,0 +1,24 @@ +{ + "xpack.upgrade": { + "methods": [ "POST" ], + "url": { + "path": "/{index}/_xpack/_upgrade", + "paths": [ + "/{index}/_xpack/_upgrade" + ], + "parts": { + "index": { + "type" : "string", + "required" : true, + "description" : "The name of the index" + } + }, + "params": { + "kibana_indices": { + "type": "list", + "description": "A comma separated list of indices that should be treated as kibana indices" + } + } + } + } +} diff --git a/plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yaml b/plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yml similarity index 67% rename from plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yaml rename to plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yml index 9b3bfbf227b..7a57fd12fc9 100644 --- a/plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yaml +++ b/plugin/src/test/resources/rest-api-spec/test/upgrade/10_basic.yml @@ -25,17 +25,53 @@ setup: indices.create: index: test2 + - do: + index: + index: test2 + type: my_type + id: 1 + body: { "field1": "foo", "field2": "bar" } + - do: + indices.refresh: {} + --- -"Index - all": +"Upgrade info - all": - do: xpack.upgrade.info: { index: _all } - length: { indices: 0 } --- -"Index - treat test2 as kibana": +"Upgrade info - all, but treat test2 as kibana": - do: xpack.upgrade.info: { index: _all, kibana_indices: test2 } - length: { indices: 1 } - match: { indices.test2.action_required: "upgrade" } + +--- +"Upgrade test2 as kibana index": + + - do: + xpack.upgrade: { index: test2, kibana_indices: test2 } + + - match: { total: 1 } + - match: { created: 1 } + - length: { failures: 0 } + + - do: + search: + index: test2_v6 + body: { "query" : { "match_all" : {} } } + - match: { hits.total: 1} + - match: { hits.hits.0._type: "doc"} + - match: { hits.hits.0._id: "my_type-1"} + - match: { hits.hits.0._source.my_type.field1: "foo"} + - match: { hits.hits.0._source.my_type.field2: "bar"} + + - do: + indices.get_alias: + index: test2_v6 + + - match: {test2_v6.aliases.test2: {}} +