Add pre-upgrade check to test cluster routing allocation is enabled (#39340) (#39815)

When following the steps mentioned in upgrade guide
https://www.elastic.co/guide/en/elastic-stack/6.6/upgrading-elastic-stack.html
if we disable the cluster shard allocation but fail to enable it after
upgrading the nodes and plugins, the next step of upgrading internal
indices fails. As we did not check the bulk request response for reindexing,
we delete the old index assuming it has been created. This is fatal
as we cannot recover from this state.

This commit adds a pre-upgrade check to test the cluster shard
allocation setting and fail upgrade if it is disabled. In case there
are search or bulk failures then we remove the read-only block and
fail the upgrade index request.

Closes #39339
This commit is contained in:
Yogesh Gaikwad 2019-03-13 09:23:32 +11:00 committed by GitHub
parent 26983d1fdf
commit db04288d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 46 deletions

View File

@ -6,7 +6,7 @@
package org.elasticsearch.xpack.core.upgrade; package org.elasticsearch.xpack.core.upgrade;
public final class IndexUpgradeCheckVersion { public final class IndexUpgradeCheckVersion {
public static final int UPRADE_VERSION = 6; public static final int UPGRADE_VERSION = 6;
private IndexUpgradeCheckVersion() {} private IndexUpgradeCheckVersion() {}

View File

@ -5,10 +5,13 @@
*/ */
package org.elasticsearch.xpack.upgrade; package org.elasticsearch.xpack.upgrade;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired; import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired;
@ -18,7 +21,6 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion; import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
/** /**
@ -51,7 +53,17 @@ public class IndexUpgradeCheck<T> {
Function<IndexMetaData, UpgradeActionRequired> actionRequired, Function<IndexMetaData, UpgradeActionRequired> actionRequired,
Client client, ClusterService clusterService, String[] types, Script updateScript) { Client client, ClusterService clusterService, String[] types, Script updateScript) {
this(name, actionRequired, client, clusterService, types, updateScript, this(name, actionRequired, client, clusterService, types, updateScript,
listener -> listener.onResponse(null), (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE)); (cs, listener) -> {
Allocation clusterRoutingAllocation = EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING
.get(cs.getMetaData().settings());
if (Allocation.NONE == clusterRoutingAllocation) {
listener.onFailure(new ElasticsearchException(
"pre-upgrade check failed, please enable cluster routing allocation using setting [{}]",
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()));
} else {
listener.onResponse(null);
}
}, (t, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
} }
/** /**
@ -69,11 +81,11 @@ public class IndexUpgradeCheck<T> {
public IndexUpgradeCheck(String name, public IndexUpgradeCheck(String name,
Function<IndexMetaData, UpgradeActionRequired> actionRequired, Function<IndexMetaData, UpgradeActionRequired> actionRequired,
Client client, ClusterService clusterService, String[] types, Script updateScript, Client client, ClusterService clusterService, String[] types, Script updateScript,
Consumer<ActionListener<T>> preUpgrade, BiConsumer<ClusterState, ActionListener<T>> preUpgrade,
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) { BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
this.name = name; this.name = name;
this.actionRequired = actionRequired; this.actionRequired = actionRequired;
this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPRADE_VERSION, updateScript, this.reindexer = new InternalIndexReindexer<>(client, clusterService, IndexUpgradeCheckVersion.UPGRADE_VERSION, updateScript,
types, preUpgrade, postUpgrade); types, preUpgrade, postUpgrade);
} }
@ -106,4 +118,9 @@ public class IndexUpgradeCheck<T> {
ActionListener<BulkByScrollResponse> listener) { ActionListener<BulkByScrollResponse> listener) {
reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener); reindexer.upgrade(task, indexMetaData.getIndex().getName(), state, listener);
} }
// pkg scope for testing
InternalIndexReindexer getInternalIndexReindexer() {
return reindexer;
}
} }

View File

@ -5,6 +5,9 @@
*/ */
package org.elasticsearch.xpack.upgrade; package org.elasticsearch.xpack.upgrade;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
@ -15,6 +18,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse;
@ -25,7 +29,6 @@ import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.index.IndexSettings.same; import static org.elasticsearch.index.IndexSettings.same;
@ -39,17 +42,18 @@ import static org.elasticsearch.index.IndexSettings.same;
* - Delete index .{name} and add alias .{name} to .{name}-6 * - Delete index .{name} and add alias .{name} to .{name}-6
*/ */
public class InternalIndexReindexer<T> { public class InternalIndexReindexer<T> {
private static final Logger logger = LogManager.getLogger(InternalIndexReindexer.class);
private final Client client; private final Client client;
private final ClusterService clusterService; private final ClusterService clusterService;
private final Script transformScript; private final Script transformScript;
private final String[] types; private final String[] types;
private final int version; private final int version;
private final Consumer<ActionListener<T>> preUpgrade; private final BiConsumer<ClusterState, ActionListener<T>> preUpgrade;
private final BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade; private final BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade;
public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types, public InternalIndexReindexer(Client client, ClusterService clusterService, int version, Script transformScript, String[] types,
Consumer<ActionListener<T>> preUpgrade, BiConsumer<ClusterState,ActionListener<T>> preUpgrade,
BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) { BiConsumer<T, ActionListener<TransportResponse.Empty>> postUpgrade) {
this.client = client; this.client = client;
this.clusterService = clusterService; this.clusterService = clusterService;
@ -62,7 +66,7 @@ public class InternalIndexReindexer<T> {
public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) { public void upgrade(TaskId task, String index, ClusterState clusterState, ActionListener<BulkByScrollResponse> listener) {
ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task); ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient(client, task);
preUpgrade.accept(ActionListener.wrap( preUpgrade.accept(clusterState, ActionListener.wrap(
t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap( t -> innerUpgrade(parentAwareClient, index, clusterState, ActionListener.wrap(
response -> postUpgrade.accept(t, ActionListener.wrap( response -> postUpgrade.accept(t, ActionListener.wrap(
empty -> listener.onResponse(response), empty -> listener.onResponse(response),
@ -76,32 +80,61 @@ public class InternalIndexReindexer<T> {
private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState, private void innerUpgrade(ParentTaskAssigningClient parentAwareClient, String index, ClusterState clusterState,
ActionListener<BulkByScrollResponse> listener) { ActionListener<BulkByScrollResponse> listener) {
String newIndex = index + "-" + version; String newIndex = index + "-" + version;
logger.trace("upgrading index {} to new index {}", index, newIndex);
try { try {
checkMasterAndDataNodeVersion(clusterState); checkMasterAndDataNodeVersion(clusterState);
parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> parentAwareClient.admin().indices().prepareCreate(newIndex).execute(ActionListener.wrap(createIndexResponse -> {
setReadOnlyBlock(index, ActionListener.wrap(setReadOnlyResponse -> setReadOnlyBlock(index, ActionListener.wrap(
reindex(parentAwareClient, index, newIndex, ActionListener.wrap( setReadOnlyResponse -> reindex(parentAwareClient, index, newIndex, ActionListener.wrap(bulkByScrollResponse -> {
bulkByScrollResponse -> // Successful completion of reindexing - delete old index if ((bulkByScrollResponse.getBulkFailures() != null
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> && bulkByScrollResponse.getBulkFailures().isEmpty() == false)
parentAwareClient.admin().indices().prepareAliases().removeIndex(index) || (bulkByScrollResponse.getSearchFailures() != null
.addAlias(newIndex, index).execute(ActionListener.wrap(deleteIndexResponse -> && bulkByScrollResponse.getSearchFailures().isEmpty() == false)) {
listener.onResponse(bulkByScrollResponse), listener::onFailure ElasticsearchException ex = logAndThrowExceptionForFailures(bulkByScrollResponse);
)), listener::onFailure removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex);
)), } else {
e -> // Something went wrong during reindexing - remove readonly flag and report the error // Successful completion of reindexing - remove read only and delete old index
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> { removeReadOnlyBlock(parentAwareClient, index,
listener.onFailure(e); ActionListener.wrap(unsetReadOnlyResponse -> parentAwareClient.admin().indices().prepareAliases()
}, e1 -> { .removeIndex(index).addAlias(newIndex, index)
listener.onFailure(e); .execute(ActionListener.wrap(
})) deleteIndexResponse -> listener.onResponse(bulkByScrollResponse),
)), listener::onFailure listener::onFailure)),
)), listener::onFailure listener::onFailure));
)); }
}, e -> {
logger.error("error occurred while reindexing", e);
removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, e);
})), listener::onFailure));
}, listener::onFailure));
} catch (Exception ex) { } catch (Exception ex) {
logger.error("error occurred while upgrading index", ex);
removeReadOnlyBlockOnReindexFailure(parentAwareClient, index, listener, ex);
listener.onFailure(ex); listener.onFailure(ex);
} }
} }
private void removeReadOnlyBlockOnReindexFailure(ParentTaskAssigningClient parentAwareClient, String index,
ActionListener<BulkByScrollResponse> listener, Exception ex) {
removeReadOnlyBlock(parentAwareClient, index, ActionListener.wrap(unsetReadOnlyResponse -> {
listener.onFailure(ex);
}, e1 -> {
listener.onFailure(ex);
}));
}
private ElasticsearchException logAndThrowExceptionForFailures(BulkByScrollResponse bulkByScrollResponse) {
String bulkFailures = (bulkByScrollResponse.getBulkFailures() != null)
? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getBulkFailures())
: "";
String searchFailures = (bulkByScrollResponse.getSearchFailures() != null)
? Strings.collectionToCommaDelimitedString(bulkByScrollResponse.getSearchFailures())
: "";
logger.error("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures, searchFailures);
return new ElasticsearchException("error occurred while reindexing, bulk failures [{}], search failures [{}]", bulkFailures,
searchFailures);
}
private void checkMasterAndDataNodeVersion(ClusterState clusterState) { private void checkMasterAndDataNodeVersion(ClusterState clusterState) {
if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) { if (clusterState.nodes().getMinNodeVersion().before(Upgrade.UPGRADE_INTRODUCED)) {
throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade"); throw new IllegalStateException("All nodes should have at least version [" + Upgrade.UPGRADE_INTRODUCED + "] to upgrade");

View File

@ -96,7 +96,7 @@ public class IndexUpgradeIT extends IndexUpgradeIntegTestCase {
} }
}, },
client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null, client(), internalCluster().clusterService(internalCluster().getMasterName()), Strings.EMPTY_ARRAY, null,
listener -> { (cs, listener) -> {
assertFalse(preUpgradeIsCalled.getAndSet(true)); assertFalse(preUpgradeIsCalled.getAndSet(true));
assertFalse(postUpgradeIsCalled.get()); assertFalse(postUpgradeIsCalled.get());
listener.onResponse(val); listener.onResponse(val);

View File

@ -6,8 +6,11 @@
package org.elasticsearch.xpack.upgrade; package org.elasticsearch.xpack.upgrade;
import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
@ -19,6 +22,7 @@ import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
@ -26,13 +30,16 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexPlugin; import org.elasticsearch.index.reindex.ReindexPlugin;
import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.protocol.xpack.migration.UpgradeActionRequired;
import org.elasticsearch.script.MockScriptPlugin; import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType; import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.TaskId; import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin; import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
import org.elasticsearch.xpack.core.upgrade.IndexUpgradeCheckVersion;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -45,10 +52,13 @@ import java.util.function.Function;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.startsWith; import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
@ClusterScope(scope=Scope.TEST)
public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase { public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
@Override @Override
@ -77,13 +87,13 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
public void testUpgradeIndex() throws Exception { public void testUpgradeIndex() throws Exception {
createTestIndex("test"); createTestIndex("test");
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
BulkByScrollResponse response = future.actionGet(); BulkByScrollResponse response = future.actionGet();
assertThat(response.getCreated(), equalTo(2L)); assertThat(response.getCreated(), equalTo(2L));
SearchResponse searchResponse = client().prepareSearch("test-123").get(); SearchResponse searchResponse = client().prepareSearch("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L)); assertThat(searchResponse.getHits().getTotalHits().value, equalTo(2L));
assertThat(searchResponse.getHits().getHits().length, equalTo(2)); assertThat(searchResponse.getHits().getHits().length, equalTo(2));
for (SearchHit hit : searchResponse.getHits().getHits()) { for (SearchHit hit : searchResponse.getHits().getHits()) {
@ -94,7 +104,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get(); GetAliasesResponse aliasesResponse = client().admin().indices().prepareGetAliases("test").get();
assertThat(aliasesResponse.getAliases().size(), equalTo(1)); assertThat(aliasesResponse.getAliases().size(), equalTo(1));
List<AliasMetaData> testAlias = aliasesResponse.getAliases().get("test-123"); List<AliasMetaData> testAlias = aliasesResponse.getAliases().get("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION);
assertNotNull(testAlias); assertNotNull(testAlias);
assertThat(testAlias.size(), equalTo(1)); assertThat(testAlias.size(), equalTo(1));
assertThat(testAlias.get(0).alias(), equalTo("test")); assertThat(testAlias.get(0).alias(), equalTo("test"));
@ -102,8 +112,8 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
public void testTargetIndexExists() throws Exception { public void testTargetIndexExists() throws Exception {
createTestIndex("test"); createTestIndex("test");
createTestIndex("test-123"); createTestIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION);
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, ResourceAlreadyExistsException.class); assertThrows(future, ResourceAlreadyExistsException.class);
@ -115,14 +125,14 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
public void testTargetIndexExistsAsAlias() throws Exception { public void testTargetIndexExistsAsAlias() throws Exception {
createTestIndex("test"); createTestIndex("test");
createTestIndex("test-foo"); createTestIndex("test-foo");
client().admin().indices().prepareAliases().addAlias("test-foo", "test-123").get(); client().admin().indices().prepareAliases().addAlias("test-foo", "test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION).get();
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, InvalidIndexNameException.class); assertThrows(future, InvalidIndexNameException.class);
// Make sure that the index is not marked as read-only // Make sure that the index is not marked as read-only
client().prepareIndex("test-123", "doc").setSource("foo", "bar").get(); client().prepareIndex("test-" + IndexUpgradeCheckVersion.UPGRADE_VERSION, "doc").setSource("foo", "bar").get();
} }
public void testSourceIndexIsReadonly() throws Exception { public void testSourceIndexIsReadonly() throws Exception {
@ -130,7 +140,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
try { try {
Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build(); Settings settings = Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build();
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get()); assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get());
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, IllegalStateException.class); assertThrows(future, IllegalStateException.class);
@ -144,12 +154,30 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
} }
} }
public void testReindexingFailureWithClusterRoutingAllocationDisabled() throws Exception {
createTestIndex("test");
Settings settings = Settings.builder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
.build();
ClusterUpdateSettingsResponse clusterUpdateResponse = client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settings).get();
assertThat(clusterUpdateResponse.isAcknowledged(), is(true));
assertThat(clusterUpdateResponse.getTransientSettings()
.get(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey()), is("none"));
InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
ElasticsearchException e = expectThrows(ElasticsearchException.class, () -> future.actionGet());
assertThat(e.getMessage(), containsString(
"pre-upgrade check failed, please enable cluster routing allocation using setting [cluster.routing.allocation.enable]"));
}
public void testReindexingFailure() throws Exception { public void testReindexingFailure() throws Exception {
createTestIndex("test"); createTestIndex("test");
// Make sure that the index is not marked as read-only // Make sure that the index is not marked as read-only
client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); client().prepareIndex("test", "doc").setSource("foo", "bar").setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
InternalIndexReindexer reindexer = createIndexReindexer(123, script("fail"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("fail"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future); reindexer.upgrade(new TaskId("abc", 123), "test", clusterState(), future);
assertThrows(future, RuntimeException.class); assertThrows(future, RuntimeException.class);
@ -161,7 +189,7 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
public void testMixedNodeVersion() throws Exception { public void testMixedNodeVersion() throws Exception {
createTestIndex("test"); createTestIndex("test");
InternalIndexReindexer reindexer = createIndexReindexer(123, script("add_bar"), Strings.EMPTY_ARRAY); InternalIndexReindexer reindexer = createIndexReindexer(script("add_bar"), Strings.EMPTY_ARRAY);
PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture(); PlainActionFuture<BulkByScrollResponse> future = PlainActionFuture.newFuture();
reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future); reindexer.upgrade(new TaskId("abc", 123), "test", withRandomOldNode(), future);
assertThrows(future, IllegalStateException.class); assertThrows(future, IllegalStateException.class);
@ -183,11 +211,9 @@ public class InternalIndexReindexerIT extends IndexUpgradeIntegTestCase {
return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>()); return new Script(ScriptType.INLINE, CustomScriptPlugin.NAME, name, new HashMap<>());
} }
private InternalIndexReindexer createIndexReindexer(int version, Script transformScript, String[] types) { private InternalIndexReindexer createIndexReindexer(Script transformScript, String[] types) {
return new InternalIndexReindexer<Void>(client(), internalCluster().clusterService(internalCluster().getMasterName()), return new IndexUpgradeCheck("test", imd -> UpgradeActionRequired.UPGRADE, client(),
version, transformScript, types, voidActionListener -> voidActionListener.onResponse(null), internalCluster().clusterService(internalCluster().getMasterName()), types, transformScript).getInternalIndexReindexer();
(aVoid, listener) -> listener.onResponse(TransportResponse.Empty.INSTANCE));
} }
private ClusterState clusterState() { private ClusterState clusterState() {