diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java index 1cc39122c9c..7a9f88e842f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.script.Script; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportResponse; import java.util.function.BiConsumer; @@ -100,12 +101,13 @@ public class IndexUpgradeCheck extends AbstractComponent { /** * Perform the index upgrade * + * @param task the task that executes the upgrade operation * @param indexMetaData index metadata * @param state current cluster state * @param listener the listener that should be called upon completion of the upgrade */ - public void upgrade(IndexMetaData indexMetaData, ClusterState state, + public void upgrade(TaskId task, IndexMetaData indexMetaData, ClusterState state, ActionListener listener) { - reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener); + reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener); } } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java index c56a6466b8f..491fb39214b 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeService.java @@ -16,6 +16,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.tasks.TaskId; import java.util.HashMap; import java.util.List; @@ -85,7 +86,7 @@ public class IndexUpgradeService extends AbstractComponent { } } - public void upgrade(String index, ClusterState state, ActionListener listener) { + public void upgrade(TaskId task, String index, ClusterState state, ActionListener listener) { IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { throw new IndexNotFoundException(index); @@ -95,7 +96,7 @@ public class IndexUpgradeService extends AbstractComponent { switch (upgradeActionRequired) { case UPGRADE: // this index needs to be upgraded - start the upgrade procedure - check.upgrade(indexMetaData, state, listener); + check.upgrade(task, indexMetaData, state, listener); return; case REINDEX: // this index needs to be re-indexed diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java index d475a8626e6..243cb4d5fda 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -5,12 +5,12 @@ */ package org.elasticsearch.xpack.upgrade; -import com.carrotsearch.hppc.procedures.ObjectProcedure; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; +import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -23,6 +23,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexAction; import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.Script; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportResponse; import java.util.function.BiConsumer; @@ -59,9 +60,10 @@ public class InternalIndexReindexer { this.postUpgrade = postUpgrade; } - public void upgrade(String index, ClusterState clusterState, ActionListener listener) { + public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener listener) { + ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task); preUpgrade.accept(ActionListener.wrap( - t -> innerUpgrade(index, clusterState, ActionListener.wrap( + t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap( response -> postUpgrade.accept(t, ActionListener.wrap( empty -> listener.onResponse(response), listener::onFailure @@ -71,22 +73,23 @@ public class InternalIndexReindexer { listener::onFailure)); } - private void innerUpgrade(String index, ClusterState clusterState, ActionListener listener) { + private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState, + ActionListener listener) { String newIndex = index + "_v" + version; try { checkMasterAndDataNodeVersion(clusterState); - client.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> + parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse -> - reindex(index, newIndex, ActionListener.wrap( + reindex(parentAwareClient, index, newIndex, ActionListener.wrap( bulkByScrollResponse -> // Successful completion of reindexing - delete old index - removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse -> - client.admin().indices().prepareAliases().removeIndex(index) + removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> + parentAwareClient.admin().indices().prepareAliases().removeIndex(index) .addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse -> listener.onResponse(bulkByScrollResponse), listener::onFailure )), listener::onFailure )), e -> // Something went wrong during reindexing - remove readonly flag and report the error - removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse -> { + removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { listener.onFailure(e); }, e1 -> { listener.onFailure(e); @@ -105,19 +108,21 @@ public class InternalIndexReindexer { } } - private void removeReadOnlyBlock(String index, ActionListener listener) { + private void removeReadOnlyBlock(ParentTaskAssigningClient parentAwareClient, String index, + ActionListener listener) { Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build(); - client.admin().indices().prepareUpdateSettings(index).setSettings(settings).execute(listener); + parentAwareClient.admin().indices().prepareUpdateSettings(index).setSettings(settings).execute(listener); } - private void reindex(String index, String newIndex, ActionListener listener) { + private void reindex(ParentTaskAssigningClient parentAwareClient, String index, String newIndex, + ActionListener listener) { SearchRequest sourceRequest = new SearchRequest(index); sourceRequest.types(types); IndexRequest destinationRequest = new IndexRequest(newIndex); ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest); reindexRequest.setRefresh(true); reindexRequest.setScript(transformScript); - client.execute(ReindexAction.INSTANCE, reindexRequest, listener); + parentAwareClient.execute(ReindexAction.INSTANCE, reindexRequest, listener); } /** diff --git a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java index 6f2f03fa2b8..d7f410b02c7 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/upgrade/actions/IndexUpgradeAction.java @@ -25,6 +25,9 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.upgrade.IndexUpgradeService; @@ -125,6 +128,16 @@ public class IndexUpgradeAction extends Action { @@ -170,8 +183,16 @@ public class IndexUpgradeAction extends Action listener) { - indexUpgradeService.upgrade(request.index(), state, listener); + protected final void masterOperation(Task task, Request request, ClusterState state, + ActionListener listener) { + TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId()); + indexUpgradeService.upgrade(taskId, request.index(), state, listener); } + + @Override + protected final void masterOperation(Request request, ClusterState state, ActionListener listener) { + throw new UnsupportedOperationException("the task parameter is required"); + } + } } \ No newline at end of file diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java index 1cd4eadf2d2..6d6a2d1d425 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; @@ -110,7 +111,7 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase { IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Collections.singletonList(check)); PlainActionFuture future = PlainActionFuture.newFuture(); - service.upgrade(testIndex, clusterService().state(), future); + service.upgrade(new TaskId("abc", 123), testIndex, clusterService().state(), future); BulkByScrollResponse response = future.actionGet(); assertThat(response.getCreated(), equalTo(2L)); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeTasksIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeTasksIT.java new file mode 100644 index 00000000000..03b12a0e2b7 --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeTasksIT.java @@ -0,0 +1,195 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.upgrade; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.reindex.BulkByScrollResponse; +import org.elasticsearch.index.reindex.ReindexAction; +import org.elasticsearch.index.reindex.ReindexPlugin; +import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.PluginsService; +import org.elasticsearch.plugins.ScriptPlugin; +import org.elasticsearch.script.MockScriptEngine; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptContext; +import org.elasticsearch.script.ScriptEngine; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.script.ScriptType; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.watcher.ResourceWatcherService; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction; +import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +@ESIntegTestCase.ClusterScope(scope = TEST, supportsDedicatedMasters = false, numClientNodes = 0, maxNumDataNodes = 1) +public class IndexUpgradeTasksIT extends ESIntegTestCase { + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + @Override + protected Collection> nodePlugins() { + return Arrays.asList(MockUpgradePlugin.class, ReindexPlugin.class); + } + + public static class MockUpgradePlugin extends Plugin implements ScriptPlugin, ActionPlugin { + + public static final String NAME = MockScriptEngine.NAME; + + private Settings settings; + private Upgrade upgrade; + + private CountDownLatch upgradeLatch = new CountDownLatch(1); + private CountDownLatch upgradeCalledLatch = new CountDownLatch(1); + + @Override + public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { + return new MockScriptEngine(pluginScriptLang(), pluginScripts()); + } + + public String pluginScriptLang() { + return NAME; + } + + public MockUpgradePlugin(Settings settings) { + this.settings = settings; + this.upgrade = new Upgrade(settings); + Loggers.getLogger(IndexUpgradeTasksIT.class).info("MockUpgradePlugin is created"); + } + + + protected Map, Object>> pluginScripts() { + Map, Object>> scripts = new HashMap<>(); + scripts.put("block", map -> { + upgradeCalledLatch.countDown(); + try { + assertThat(upgradeLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return null; + }); + return scripts; + } + + @Override + public Collection createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, ScriptService scriptService, + NamedXContentRegistry xContentRegistry) { + return Collections.singletonList(new IndexUpgradeService(settings, Collections.singletonList( + new IndexUpgradeCheck("test", settings, + new Function() { + @Override + public UpgradeActionRequired apply(IndexMetaData indexMetaData) { + if ("test".equals(indexMetaData.getIndex().getName())) { + if (Upgrade.checkInternalIndexFormat(indexMetaData)) { + return UpgradeActionRequired.UP_TO_DATE; + } else { + return UpgradeActionRequired.UPGRADE; + } + } else { + return UpgradeActionRequired.NOT_APPLICABLE; + } + } + }, + client, clusterService, Strings.EMPTY_ARRAY, + new Script(ScriptType.INLINE, NAME, "block", Collections.emptyMap())) + ))); + } + + @Override + public List> getActions() { + return upgrade.getActions(); + } + + @Override + public Collection getRestHeaders() { + return upgrade.getRestHeaders(); + } + } + + @Override + protected Collection> transportClientPlugins() { + return nodePlugins(); + } + + public void testParentTasksDuringUpgrade() throws Exception { + logger.info("before getInstance"); + PluginsService pluginsService = internalCluster().getDataNodeInstance(PluginsService.class); + MockUpgradePlugin mockUpgradePlugin = pluginsService.filterPlugins(MockUpgradePlugin.class).get(0); + assertThat(mockUpgradePlugin, notNullValue()); + logger.info("after getInstance"); + + assertAcked(client().admin().indices().prepareCreate("test").get()); + client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + + ensureYellow("test"); + + + IndexUpgradeInfoAction.Response infoResponse = client().prepareExecute(IndexUpgradeInfoAction.INSTANCE).setIndices("test").get(); + assertThat(infoResponse.getActions().keySet(), contains("test")); + assertThat(infoResponse.getActions().get("test"), equalTo(UpgradeActionRequired.UPGRADE)); + + + ActionFuture upgradeResponse = + client().prepareExecute(IndexUpgradeAction.INSTANCE).setIndex("test").execute(); + + + assertThat(mockUpgradePlugin.upgradeCalledLatch.await(10, TimeUnit.SECONDS), equalTo(true)); + ListTasksResponse response = client().admin().cluster().prepareListTasks().get(); + mockUpgradePlugin.upgradeLatch.countDown(); + + // Find the upgrade task group + TaskGroup upgradeGroup = null; + for (TaskGroup group : response.getTaskGroups()) { + if (IndexUpgradeAction.NAME.equals(group.getTaskInfo().getAction())) { + assertThat(upgradeGroup, nullValue()); + upgradeGroup = group; + } + } + assertThat(upgradeGroup, notNullValue()); + assertThat(upgradeGroup.getTaskInfo().isCancellable(), equalTo(true)); // The task should be cancellable + assertThat(upgradeGroup.getChildTasks(), hasSize(1)); // The reindex task should be a child + assertThat(upgradeGroup.getChildTasks().get(0).getTaskInfo().getAction(), equalTo(ReindexAction.NAME)); + + assertThat(upgradeResponse.get().getCreated(), equalTo(1L)); + } +} diff --git a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java index 6276deff09b..3b9c10b36bf 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.XPackPlugin; @@ -76,7 +77,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { createTestIndex("test"); InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", clusterState(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); BulkByScrollResponse response = future.actionGet(); assertThat(response.getCreated(), equalTo(2L)); @@ -102,7 +103,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { createTestIndex("test_v123"); InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", clusterState(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, ResourceAlreadyExistsException.class); // Make sure that the index is not marked as read-only @@ -115,7 +116,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { client().admin().indices().prepareAliases().addAlias("test-foo", "test_v123").get(); InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", clusterState(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, InvalidIndexNameException.class); // Make sure that the index is not marked as read-only @@ -129,7 +130,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", clusterState(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, IllegalStateException.class); // Make sure that the index is still marked as read-only @@ -148,7 +149,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", clusterState(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, RuntimeException.class); // Make sure that the index is not marked as read-only @@ -160,7 +161,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); - reindexer.upgrade("test", withRandomOldNode(), future); + reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future); assertThrows(future, IllegalStateException.class); // Make sure that the index is not marked as read-only