Upgrade API: fix parent task propagation for upgrade (elastic/x-pack-elasticsearch#1986)

Ensures that parent task is propagated to child operations to ensure that reindex operation can be cancelled if needed.

Original commit: elastic/x-pack-elasticsearch@fa40b5a951
This commit is contained in:
Igor Motov 2017-07-13 16:25:38 -04:00 committed by GitHub
parent 20c06578f6
commit dd11fc3d0a
7 changed files with 252 additions and 26 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse;
import java.util.function.BiConsumer;
@ -100,12 +101,13 @@ public class IndexUpgradeCheck<T> extends AbstractComponent {
/**
* Perform the index upgrade
*
* @param task the task that executes the upgrade operation
* @param indexMetaData index metadata
* @param state current cluster state
* @param listener the listener that should be called upon completion of the upgrade
*/
public void upgrade(IndexMetaData indexMetaData, ClusterState state,
public void upgrade(TaskId task, IndexMetaData indexMetaData, ClusterState state,
ActionListener<BulkByScrollResponse> listener) {
reindexer.upgrade(indexMetaData.getIndex().getName(), state, listener);
reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener);
}
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.tasks.TaskId;
import java.util.HashMap;
import java.util.List;
@ -85,7 +86,7 @@ public class IndexUpgradeService extends AbstractComponent {
}
}
public void upgrade(String index, ClusterState state, ActionListener<BulkByScrollResponse> listener) {
public void upgrade(TaskId task, String index, ClusterState state, ActionListener<BulkByScrollResponse> listener) {
IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
throw new IndexNotFoundException(index);
@ -95,7 +96,7 @@ public class IndexUpgradeService extends AbstractComponent {
switch (upgradeActionRequired) {
case UPGRADE:
// this index needs to be upgraded - start the upgrade procedure
check.upgrade(indexMetaData, state, listener);
check.upgrade(task, indexMetaData, state, listener);
return;
case REINDEX:
// this index needs to be re-indexed

View File

@ -5,12 +5,12 @@
*/
package org.elasticsearch.xpack.upgrade;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks;
@ -23,6 +23,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse;
import java.util.function.BiConsumer;
@ -59,9 +60,10 @@ public class InternalIndexReindexer<T> {
this.postUpgrade = postUpgrade;
}
public void upgrade(String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task);
preUpgrade.accept(ActionListener.wrap(
t -> innerUpgrade(index, clusterState, ActionListener.wrap(
t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap(
response -> postUpgrade.accept(t, ActionListener.wrap(
empty -> listener.onResponse(response),
listener::onFailure
@ -71,22 +73,23 @@ public class InternalIndexReindexer<T> {
listener::onFailure));
}
private void innerUpgrade(String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) {
String newIndex = index + "_v" + version;
try {
checkMasterAndDataNodeVersion(clusterState);
client.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse ->
parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse ->
setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse ->
reindex(index, newIndex, ActionListener.wrap(
reindex(parentAwareClient, index, newIndex, ActionListener.wrap(
bulkByScrollResponse -> // Successful completion of reindexing - delete old index
removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse ->
client.admin().indices().prepareAliases().removeIndex(index)
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse ->
parentAwareClient.admin().indices().prepareAliases().removeIndex(index)
.addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse ->
listener.onResponse(bulkByScrollResponse), listener::onFailure
)), listener::onFailure
)),
e -> // Something went wrong during reindexing - remove readonly flag and report the error
removeReadOnlyBlock(index, ActionListener.wrap(unsetReadOnlyResponse -> {
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> {
listener.onFailure(e);
}, e1 -> {
listener.onFailure(e);
@ -105,19 +108,21 @@ public class InternalIndexReindexer<T> {
}
}
private void removeReadOnlyBlock(String index, ActionListener<UpdateSettingsResponse> listener) {
private void removeReadOnlyBlock(ParentTaskAssigningClient parentAwareClient, String index,
ActionListener<UpdateSettingsResponse> listener) {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), false).build();
client.admin().indices().prepareUpdateSettings(index).setSettings(settings).execute(listener);
parentAwareClient.admin().indices().prepareUpdateSettings(index).setSettings(settings).execute(listener);
}
private void reindex(String index, String newIndex, ActionListener<BulkByScrollResponse> listener) {
private void reindex(ParentTaskAssigningClient parentAwareClient, String index, String newIndex,
ActionListener<BulkByScrollResponse> listener) {
SearchRequest sourceRequest = new SearchRequest(index);
sourceRequest.types(types);
IndexRequest destinationRequest = new IndexRequest(newIndex);
ReindexRequest reindexRequest = new ReindexRequest(sourceRequest, destinationRequest);
reindexRequest.setRefresh(true);
reindexRequest.setScript(transformScript);
client.execute(ReindexAction.INSTANCE, reindexRequest, listener);
parentAwareClient.execute(ReindexAction.INSTANCE, reindexRequest, listener);
}
/**

View File

@ -25,6 +25,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.upgrade.IndexUpgradeService;
@ -125,6 +128,16 @@ public class IndexUpgradeAction extends Action<IndexUpgradeAction.Request, BulkB
public int hashCode() {
return Objects.hash(index);
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new CancellableTask(id, type, action, getDescription(), parentTaskId) {
@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}
};
}
}
public static class RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, BulkByScrollResponse, RequestBuilder> {
@ -170,8 +183,16 @@ public class IndexUpgradeAction extends Action<IndexUpgradeAction.Request, BulkB
}
@Override
protected final void masterOperation(final Request request, ClusterState state, ActionListener<BulkByScrollResponse> listener) {
indexUpgradeService.upgrade(request.index(), state, listener);
protected final void masterOperation(Task task, Request request, ClusterState state,
ActionListener<BulkByScrollResponse> listener) {
TaskId taskId = new TaskId(clusterService.localNode().getId(), task.getId());
indexUpgradeService.upgrade(taskId, request.index(), state, listener);
}
@Override
protected final void masterOperation(Request request, ClusterState state, ActionListener<BulkByScrollResponse> listener) {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
@ -110,7 +111,7 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase {
IndexUpgradeService service = new IndexUpgradeService(Settings.EMPTY, Collections.singletonList(check));
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
service.upgrade(testIndex, clusterService().state(), future);
service.upgrade(new TaskId("abc", 123), testIndex, clusterService().state(), future);
BulkByScrollResponse response = future.actionGet();
assertThat(response.getCreated(), equalTo(2L));

View File

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.script.MockScriptEngine;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptEngine;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeAction;
import org.elasticsearch.xpack.upgrade.actions.IndexUpgradeInfoAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
@ESIntegTestCase.ClusterScope(scope = TEST, supportsDedicatedMasters = false, numClientNodes = 0, maxNumDataNodes = 1)
public class IndexUpgradeTasksIT extends ESIntegTestCase {
@Override
protected boolean ignoreExternalCluster() {
return true;
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockUpgradePlugin.class, ReindexPlugin.class);
}
public static class MockUpgradePlugin extends Plugin implements ScriptPlugin, ActionPlugin {
public static final String NAME = MockScriptEngine.NAME;
private Settings settings;
private Upgrade upgrade;
private CountDownLatch upgradeLatch = new CountDownLatch(1);
private CountDownLatch upgradeCalledLatch = new CountDownLatch(1);
@Override
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new MockScriptEngine(pluginScriptLang(), pluginScripts());
}
public String pluginScriptLang() {
return NAME;
}
public MockUpgradePlugin(Settings settings) {
this.settings = settings;
this.upgrade = new Upgrade(settings);
Loggers.getLogger(IndexUpgradeTasksIT.class).info("MockUpgradePlugin is created");
}
protected Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
Map<String, Function<Map<String, Object>, Object>> scripts = new HashMap<>();
scripts.put("block", map -> {
upgradeCalledLatch.countDown();
try {
assertThat(upgradeLatch.await(10, TimeUnit.SECONDS), equalTo(true));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
});
return scripts;
}
@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry) {
return Collections.singletonList(new IndexUpgradeService(settings, Collections.singletonList(
new IndexUpgradeCheck("test", settings,
new Function<IndexMetaData, UpgradeActionRequired>() {
@Override
public UpgradeActionRequired apply(IndexMetaData indexMetaData) {
if ("test".equals(indexMetaData.getIndex().getName())) {
if (Upgrade.checkInternalIndexFormat(indexMetaData)) {
return UpgradeActionRequired.UP_TO_DATE;
} else {
return UpgradeActionRequired.UPGRADE;
}
} else {
return UpgradeActionRequired.NOT_APPLICABLE;
}
}
},
client, clusterService, Strings.EMPTY_ARRAY,
new Script(ScriptType.INLINE, NAME, "block", Collections.emptyMap()))
)));
}
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return upgrade.getActions();
}
@Override
public Collection<String> getRestHeaders() {
return upgrade.getRestHeaders();
}
}
@Override
protected Collection<Class<? extends Plugin>> transportClientPlugins() {
return nodePlugins();
}
public void testParentTasksDuringUpgrade() throws Exception {
logger.info("before getInstance");
PluginsService pluginsService = internalCluster().getDataNodeInstance(PluginsService.class);
MockUpgradePlugin mockUpgradePlugin = pluginsService.filterPlugins(MockUpgradePlugin.class).get(0);
assertThat(mockUpgradePlugin, notNullValue());
logger.info("after getInstance");
assertAcked(client().admin().indices().prepareCreate("test").get());
client().prepareIndex("test", "doc", "1").setSource("{\"foo\": \"bar\"}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");
IndexUpgradeInfoAction.Response infoResponse = client().prepareExecute(IndexUpgradeInfoAction.INSTANCE).setIndices("test").get();
assertThat(infoResponse.getActions().keySet(), contains("test"));
assertThat(infoResponse.getActions().get("test"), equalTo(UpgradeActionRequired.UPGRADE));
ActionFuture<BulkByScrollResponse> upgradeResponse =
client().prepareExecute(IndexUpgradeAction.INSTANCE).setIndex("test").execute();
assertThat(mockUpgradePlugin.upgradeCalledLatch.await(10, TimeUnit.SECONDS), equalTo(true));
ListTasksResponse response = client().admin().cluster().prepareListTasks().get();
mockUpgradePlugin.upgradeLatch.countDown();
// Find the upgrade task group
TaskGroup upgradeGroup = null;
for (TaskGroup group : response.getTaskGroups()) {
if (IndexUpgradeAction.NAME.equals(group.getTaskInfo().getAction())) {
assertThat(upgradeGroup, nullValue());
upgradeGroup = group;
}
}
assertThat(upgradeGroup, notNullValue());
assertThat(upgradeGroup.getTaskInfo().isCancellable(), equalTo(true)); // The task should be cancellable
assertThat(upgradeGroup.getChildTasks(), hasSize(1)); // The reindex task should be a child
assertThat(upgradeGroup.getChildTasks().get(0).getTaskInfo().getAction(), equalTo(ReindexAction.NAME));
assertThat(upgradeResponse.get().getCreated(), equalTo(1L));
}
}

View File

@ -29,6 +29,7 @@ import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.XPackPlugin;
@ -76,7 +77,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
createTestIndex("test");
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", clusterState(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
BulkByScrollResponse response = future.actionGet();
assertThat(response.getCreated(), equalTo(2L));
@ -102,7 +103,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
createTestIndex("test_v123");
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", clusterState(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, ResourceAlreadyExistsException.class);
// Make sure that the index is not marked as read-only
@ -115,7 +116,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
client().admin().indices().prepareAliases().addAlias("test-foo", "test_v123").get();
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", clusterState(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, InvalidIndexNameException.class);
// Make sure that the index is not marked as read-only
@ -129,7 +130,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", clusterState(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, IllegalStateException.class);
// Make sure that the index is still marked as read-only
@ -148,7 +149,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", clusterState(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, RuntimeException.class);
// Make sure that the index is not marked as read-only
@ -160,7 +161,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade("test", withRandomOldNode(), future);
reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future);
assertThrows(future, IllegalStateException.class);
// Make sure that the index is not marked as read-only