From db04288d148c73e3a4793f335adc234a803d0b03 Mon Sep 17 00:00:00 2001 From: Yogesh Gaikwad <902768+bizybot@users.noreply.github.com> Date: Wed, 13 Mar 2019 09:23:32 +1100 Subject: [PATCH] Add pre-upgrade check to test cluster routing allocation is enabled (#39340) (#39815) When following the steps mentioned in upgrade guide https://www.elastic.co/guide/en/elastic-stack/6.6/upgrading-elastic-stack.html if we disable the cluster shard allocation but fail to enable it after upgrading the nodes and plugins, the next step of upgrading internal indices fails. As we did not check the bulk request response for reindexing, we delete the old index assuming it has been created. This is fatal as we cannot recover from this state. This commit adds a pre-upgrade check to test the cluster shard allocation setting and fail upgrade if it is disabled. In case there are search or bulk failures then we remove the read-only block and fail the upgrade index request. Closes #39339 --- .../upgrade/IndexUpgradeCheckVersion.java | 2 +- .../xpack/upgrade/IndexUpgradeCheck.java | 25 +++++- .../xpack/upgrade/InternalIndexReindexer.java | 79 +++++++++++++------ .../xpack/upgrade/IndexUpgradeIT.java | 2 +- .../upgrade/InternalIndexReindexerIT.java | 60 ++++++++++---- 5 files changed, 122 insertions(+), 46 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java index e09f73a688e..298c8ac95c2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/upgrade/IndexUpgradeCheckVersion.java @@ -6,7 +6,7 @@ package org.elasticsearch.xpack.core.upgrade; public final class IndexUpgradeCheckVersion { - public static final int UPRADE_VERSION = 6; + public static final int UPGRADE_VERSION = 6; private IndexUpgradeCheckVersion() {} diff --git a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java index 62a2829b925..102827f87f7 100644 --- a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java +++ b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/IndexUpgradeCheck.java @@ -5,10 +5,13 @@ */ package org.elasticsearch.xpack.upgrade; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired; @@ -18,7 +21,6 @@ import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; /** @@ -51,7 +53,17 @@ public class IndexUpgradeCheck { Function actionRequired, Client client, ClusterService clusterService, String[] types, Script updateScript) { this(name, actionRequired, client, clusterService, types, updateScript, - listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); + (cs, listener) -> { + Allocation clusterRoutingAllocation = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING + .get(cs.getMetaData().settings()); + if (Allocation.NONE == clusterRoutingAllocation) { + listener.onFailure(new ElasticsearchException( + "pre-upgrade check failed, please enable cluster routing allocation using setting [{}]", + EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey())); + } else { + listener.onResponse(null); + } + }, (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); } /** @@ -69,11 +81,11 @@ public class IndexUpgradeCheck { public IndexUpgradeCheck(String name, Function actionRequired, Client client, ClusterService clusterService, String[] types, Script updateScript, - Consumer> preUpgrade, + BiConsumer> preUpgrade, BiConsumer> postUpgrade) { this.name = name; this.actionRequired = actionRequired; - this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPRADE_VERSION, updateScript, + this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPGRADE_VERSION, updateScript, types, preUpgrade, postUpgrade); } @@ -106,4 +118,9 @@ public class IndexUpgradeCheck { ActionListener listener) { reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener); } + + // pkg scope for testing + InternalIndexReindexer getInternalIndexReindexer() { + return reindexer; + } } diff --git a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java index 6ab920555bb..763fc7d92de 100644 --- a/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java +++ b/x-pack/plugin/upgrade/src/main/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexer.java @@ -5,6 +5,9 @@ */ package org.elasticsearch.xpack.upgrade; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; @@ -15,6 +18,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -25,7 +29,6 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.transport.TransportResponse; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.index.IndexSettings.same; @@ -39,17 +42,18 @@ import static org.elasticsearch.index.IndexSettings.same; * - Delete index .{name} and add alias .{name} to .{name}-6 */ public class InternalIndexReindexer { + private static final Logger logger = LogManager.getLogger(InternalIndexReindexer.class); private final Client client; private final ClusterService clusterService; private final Script transformScript; private final String[] types; private final int version; - private final Consumer> preUpgrade; + private final BiConsumer> preUpgrade; private final BiConsumer> postUpgrade; public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types, - Consumer> preUpgrade, + BiConsumer> preUpgrade, BiConsumer> postUpgrade) { this.client = client; this.clusterService = clusterService; @@ -62,7 +66,7 @@ public class InternalIndexReindexer { public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener listener) { ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task); - preUpgrade.accept(ActionListener.wrap( + preUpgrade.accept(clusterState, ActionListener.wrap( t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap( response -> postUpgrade.accept(t, ActionListener.wrap( empty -> listener.onResponse(response), @@ -76,32 +80,61 @@ public class InternalIndexReindexer { private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState, ActionListener listener) { String newIndex = index + "-" + version; + logger.trace("upgrading index {} to new index {}", index, newIndex); try { checkMasterAndDataNodeVersion(clusterState); - parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> - setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse -> - reindex(parentAwareClient, index, newIndex, ActionListener.wrap( - bulkByScrollResponse -> // Successful completion of reindexing - delete old index - removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> - parentAwareClient.admin().indices().prepareAliases().removeIndex(index) - .addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse -> - listener.onResponse(bulkByScrollResponse), listener::onFailure - )), listener::onFailure - )), - e -> // Something went wrong during reindexing - remove readonly flag and report the error - removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { - listener.onFailure(e); - }, e1 -> { - listener.onFailure(e); - })) - )), listener::onFailure - )), listener::onFailure - )); + parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> { + setReadOnlyBlock(index, ActionListener.wrap( + setReadOnlyResponse -> reindex(parentAwareClient, index, newIndex, ActionListener.wrap(bulkByScrollResponse -> { + if ((bulkByScrollResponse.getBulkFailures() != null + && bulkByScrollResponse.getBulkFailures().isEmpty() == false) + || (bulkByScrollResponse.getSearchFailures() != null + && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) { + ElasticsearchException ex = logAndThrowExceptionForFailures(bulkByScrollResponse); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex); + } else { + // Successful completion of reindexing - remove read only and delete old index + removeReadOnlyBlock(parentAwareClient, index, + ActionListener.wrap(unsetReadOnlyResponse -> parentAwareClient.admin().indices().prepareAliases() + .removeIndex(index).addAlias(newIndex, index) + .execute(ActionListener.wrap( + deleteIndexResponse -> listener.onResponse(bulkByScrollResponse), + listener::onFailure)), + listener::onFailure)); + } + }, e -> { + logger.error("error occurred while reindexing", e); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, e); + })), listener::onFailure)); + }, listener::onFailure)); } catch (Exception ex) { + logger.error("error occurred while upgrading index", ex); + removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex); listener.onFailure(ex); } } + private void removeReadOnlyBlockOnReindexFailure(ParentTaskAssigningClient parentAwareClient, String index, + ActionListener listener, Exception ex) { + removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { + listener.onFailure(ex); + }, e1 -> { + listener.onFailure(ex); + })); + } + + private ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) { + String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null) + ? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures()) + : ""; + String searchFailures = (bulkByScrollResponse.getSearchFailures() != null) + ? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getSearchFailures()) + : ""; + logger.error("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, searchFailures); + return new ElasticsearchException("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, + searchFailures); + } + private void checkMasterAndDataNodeVersion(ClusterState clusterState) { if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) { throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade"); diff --git a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java index 3663d586159..c764966d113 100644 --- a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java +++ b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/IndexUpgradeIT.java @@ -96,7 +96,7 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase { } }, client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null, - listener -> { + (cs, listener) -> { assertFalse(preUpgradeIsCalled.getAndSet(true)); assertFalse(postUpgradeIsCalled.get()); listener.onResponse(val); diff --git a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java index 013680ee2d1..9f9c7353ad6 100644 --- a/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java +++ b/x-pack/plugin/upgrade/src/test/java/org/elasticsearch/xpack/upgrade/InternalIndexReindexerIT.java @@ -6,8 +6,11 @@ package org.elasticsearch.xpack.upgrade; import com.carrotsearch.hppc.cursors.ObjectCursor; + +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.Version; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.PlainActionFuture; @@ -19,6 +22,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; @@ -26,13 +30,16 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired; import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.SearchHit; import org.elasticsearch.tasks.TaskId; -import org.elasticsearch.transport.TransportResponse; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; +import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; import java.util.ArrayList; import java.util.Arrays; @@ -45,10 +52,13 @@ import java.util.function.Function; import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.core.IsEqual.equalTo; +@ClusterScope(scope=Scope.TEST) public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { @Override @@ -77,13 +87,13 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { public void testUpgradeIndex() throws Exception { createTestIndex("test"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); BulkByScrollResponse response = future.actionGet(); assertThat(response.getCreated(), equalTo(2L)); - SearchResponse searchResponse = client().prepareSearch("test-123").get(); + SearchResponse searchResponse = client().prepareSearch("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get(); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L)); assertThat(searchResponse.getHits().getHits().length, equalTo(2)); for (SearchHit hit : searchResponse.getHits().getHits()) { @@ -94,7 +104,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get(); assertThat(aliasesResponse.getAliases().size(), equalTo(1)); - List testAlias = aliasesResponse.getAliases().get("test-123"); + List testAlias = aliasesResponse.getAliases().get("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION); assertNotNull(testAlias); assertThat(testAlias.size(), equalTo(1)); assertThat(testAlias.get(0).alias(), equalTo("test")); @@ -102,8 +112,8 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { public void testTargetIndexExists() throws Exception { createTestIndex("test"); - createTestIndex("test-123"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + createTestIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, ResourceAlreadyExistsException.class); @@ -115,14 +125,14 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { public void testTargetIndexExistsAsAlias() throws Exception { createTestIndex("test"); createTestIndex("test-foo"); - client().admin().indices().prepareAliases().addAlias("test-foo", "test-123").get(); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + client().admin().indices().prepareAliases().addAlias("test-foo", "test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get(); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, InvalidIndexNameException.class); // Make sure that the index is not marked as read-only - client().prepareIndex("test-123", "doc").setSource("foo", "bar").get(); + client().prepareIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION, "doc").setSource("foo", "bar").get(); } public void testSourceIndexIsReadonly() throws Exception { @@ -130,7 +140,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { try { Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, IllegalStateException.class); @@ -144,12 +154,30 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { } } + public void testReindexingFailureWithClusterRoutingAllocationDisabled() throws Exception { + createTestIndex("test"); + + Settings settings = Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none") + .build(); + ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin().cluster().prepareUpdateSettings() + .setTransientSettings(settings).get(); + assertThat(clusterUpdateResponse.isAcknowledged(), is(true)); + assertThat(clusterUpdateResponse.getTransientSettings() + .get(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()), is("none")); + + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); + PlainActionFuture future = PlainActionFuture.newFuture(); + reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); + ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> future.actionGet()); + assertThat(e.getMessage(), containsString( + "pre-upgrade check failed, please enable cluster routing allocation using setting [cluster.routing.allocation.enable]")); + } public void testReindexingFailure() throws Exception { createTestIndex("test"); // Make sure that the index is not marked as read-only client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("fail"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); assertThrows(future, RuntimeException.class); @@ -161,7 +189,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { public void testMixedNodeVersion() throws Exception { createTestIndex("test"); - InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); + InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY); PlainActionFuture future = PlainActionFuture.newFuture(); reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future); assertThrows(future, IllegalStateException.class); @@ -183,11 +211,9 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>()); } - private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) { - return new InternalIndexReindexer(client(), internalCluster().clusterService(internalCluster().getMasterName()), - version, transformScript, types, voidActionListener -> voidActionListener.onResponse(null), - (aVoid, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); - + private InternalIndexReindexer createIndexReindexer(Script transformScript, String[] types) { + return new IndexUpgradeCheck("test", imd -> UpgradeActionRequired.UPGRADE, client(), + internalCluster().clusterService(internalCluster().getMasterName()), types, transformScript).getInternalIndexReindexer(); } private ClusterState clusterState() {