Ensure shrunk indices carry over version information from its source (#22469)
Today when an index is shrunk the version information is not carried over from the source to the target index. This can cause major issues like mapping incompatibilities for instance if an index from a previous major version is shrunk. This commit ensures that all version information from the soruce index is preserved when a shrunk index is created. Closes #22373
This commit is contained in:
parent
923820c6c9
commit
79093e1663
|
@ -545,6 +545,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
throw new IllegalArgumentException("mappings are not allowed when shrinking indices" +
|
||||
", all mappings are copied from the source index");
|
||||
}
|
||||
|
||||
if (IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.exists(targetIndexSettings)) {
|
||||
// this method applies all necessary checks ie. if the target shards are less than the source shards
|
||||
// of if the source shards are divisible by the number of target shards
|
||||
|
@ -588,9 +589,14 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
.put("index.allocation.max_retries", 1)
|
||||
// now copy all similarity / analysis settings - this overrides all settings from the user unless they
|
||||
// wanna add extra settings
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, sourceMetaData.getCreationVersion())
|
||||
.put(IndexMetaData.SETTING_VERSION_UPGRADED, sourceMetaData.getUpgradedVersion())
|
||||
.put(sourceMetaData.getSettings().filter(analysisSimilarityPredicate))
|
||||
.put(IndexMetaData.INDEX_SHRINK_SOURCE_NAME.getKey(), shrinkFromIndex.getName())
|
||||
.put(IndexMetaData.INDEX_SHRINK_SOURCE_UUID.getKey(), shrinkFromIndex.getUUID());
|
||||
if (sourceMetaData.getMinimumCompatibleVersion() != null) {
|
||||
indexSettingsBuilder.put(IndexMetaData.SETTING_VERSION_MINIMUM_COMPATIBLE, sourceMetaData.getMinimumCompatibleVersion());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,26 +21,19 @@ package org.elasticsearch.action.admin.indices.create;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.RangeQueryBuilder;
|
||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
|
@ -53,7 +46,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_WAIT_FOR_
|
|||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
@ -63,6 +55,7 @@ import static org.hamcrest.core.IsNull.notNullValue;
|
|||
|
||||
@ClusterScope(scope = Scope.TEST)
|
||||
public class CreateIndexIT extends ESIntegTestCase {
|
||||
|
||||
public void testCreationDateGivenFails() {
|
||||
try {
|
||||
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_CREATION_DATE, 4L)).get();
|
||||
|
@ -288,192 +281,6 @@ public class CreateIndexIT extends ESIntegTestCase {
|
|||
ensureGreen("test");
|
||||
}
|
||||
|
||||
public void testCreateShrinkIndexToN() {
|
||||
int[][] possibleShardSplits = new int[][] {{8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1}};
|
||||
int[] shardSplits = randomFrom(possibleShardSplits);
|
||||
assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
|
||||
assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String mergeNode = discoveryNodes[0].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a 4 shard index
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "first_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", shardSplits[1]).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
for (int i = 0; i < 20; i++) { // now update
|
||||
client().prepareIndex("first_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("first_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a 2 shard index
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("first_shrink", "second_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", shardSplits[2]).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
// let it be allocated anywhere and bump replicas
|
||||
client().admin().indices().prepareUpdateSettings("second_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.putNull("index.routing.allocation.include._id")
|
||||
.put("index.number_of_replicas", 1)).get();
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
for (int i = 0; i < 20; i++) { // now update
|
||||
client().prepareIndex("second_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
|
||||
public void testCreateShrinkIndex() {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", randomIntBetween(2, 7))).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String mergeNode = discoveryNodes[0].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a single shard index
|
||||
|
||||
final boolean createWithReplicas = randomBoolean();
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
|
||||
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
if (createWithReplicas == false) {
|
||||
// bump replicas
|
||||
client().admin().indices().prepareUpdateSettings("target")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 1)).get();
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
|
||||
for (int i = 20; i < 40; i++) {
|
||||
client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
}
|
||||
/**
|
||||
* Tests that we can manually recover from a failed allocation due to shards being moved away etc.
|
||||
*/
|
||||
public void testCreateShrinkIndexFails() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
|
||||
.put("number_of_shards", randomIntBetween(2, 7))
|
||||
.put("number_of_replicas", 0)).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String spareNode = discoveryNodes[0].getName();
|
||||
String mergeNode = discoveryNodes[1].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
|
||||
// now merge source into a single shard index
|
||||
client().admin().indices().prepareShrinkIndex("source", "target")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.allocation.max_retries", 1).build()).get();
|
||||
|
||||
// now we move all shards away from the merge node
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen("source");
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("target") // erase the forcefully fuckup!
|
||||
.setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")).get();
|
||||
// wait until it fails
|
||||
assertBusy(() -> {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
|
||||
RoutingTable routingTables = clusterStateResponse.getState().routingTable();
|
||||
assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned());
|
||||
assertEquals(UnassignedInfo.Reason.ALLOCATION_FAILED,
|
||||
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason());
|
||||
assertEquals(1,
|
||||
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations());
|
||||
});
|
||||
client().admin().indices().prepareUpdateSettings("source") // now relocate them all to the right node
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)).get();
|
||||
ensureGreen("source");
|
||||
|
||||
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalCluster().getInstance(ClusterInfoService.class,
|
||||
internalCluster().getMasterName());
|
||||
infoService.refresh();
|
||||
// kick off a retry and wait until it's done!
|
||||
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
|
||||
long expectedShardSize = clusterRerouteResponse.getState().routingTable().index("target")
|
||||
.shard(0).getShards().get(0).getExpectedShardSize();
|
||||
// we support the expected shard size in the allocator to sum up over the source index shards
|
||||
assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0);
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test ensures that index creation adheres to the {@link IndexMetaData#SETTING_WAIT_FOR_ACTIVE_SHARDS}.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,246 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.create;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.cluster.ClusterInfoService;
|
||||
import org.elasticsearch.cluster.InternalClusterInfoService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.TermsQueryBuilder;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.InternalSettingsPlugin;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
||||
public class ShrinkIndexIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(InternalSettingsPlugin.class);
|
||||
}
|
||||
|
||||
public void testCreateShrinkIndexToN() {
|
||||
int[][] possibleShardSplits = new int[][] {{8,4,2}, {9, 3, 1}, {4, 2, 1}, {15,5,1}};
|
||||
int[] shardSplits = randomFrom(possibleShardSplits);
|
||||
assertEquals(shardSplits[0], (shardSplits[0] / shardSplits[1]) * shardSplits[1]);
|
||||
assertEquals(shardSplits[1], (shardSplits[1] / shardSplits[2]) * shardSplits[2]);
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", shardSplits[0])).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String mergeNode = discoveryNodes[0].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a 4 shard index
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "first_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", shardSplits[1]).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
for (int i = 0; i < 20; i++) { // now update
|
||||
client().prepareIndex("first_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("first_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a 2 shard index
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("first_shrink", "second_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", shardSplits[2]).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
// let it be allocated anywhere and bump replicas
|
||||
client().admin().indices().prepareUpdateSettings("second_shrink")
|
||||
.setSettings(Settings.builder()
|
||||
.putNull("index.routing.allocation.include._id")
|
||||
.put("index.number_of_replicas", 1)).get();
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
for (int i = 0; i < 20; i++) { // now update
|
||||
client().prepareIndex("second_shrink", "t1", Integer.toString(i)).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("second_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("first_shrink").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
|
||||
public void testCreateShrinkIndex() {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
Version version = VersionUtils.randomVersion(random());
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
|
||||
.put("number_of_shards", randomIntBetween(2, 7))
|
||||
.put("index.version.created", version)
|
||||
).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String mergeNode = discoveryNodes[0].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
// now merge source into a single shard index
|
||||
|
||||
final boolean createWithReplicas = randomBoolean();
|
||||
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target")
|
||||
.setSettings(Settings.builder().put("index.number_of_replicas", createWithReplicas ? 1 : 0).build()).get());
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
|
||||
if (createWithReplicas == false) {
|
||||
// bump replicas
|
||||
client().admin().indices().prepareUpdateSettings("target")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.number_of_replicas", 1)).get();
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
|
||||
for (int i = 20; i < 40; i++) {
|
||||
client().prepareIndex("target", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
flushAndRefresh();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 40);
|
||||
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
GetSettingsResponse target = client().admin().indices().prepareGetSettings("target").get();
|
||||
assertEquals(version, target.getIndexToSettings().get("target").getAsVersion("index.version.created", null));
|
||||
}
|
||||
/**
|
||||
* Tests that we can manually recover from a failed allocation due to shards being moved away etc.
|
||||
*/
|
||||
public void testCreateShrinkIndexFails() throws Exception {
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
prepareCreate("source").setSettings(Settings.builder().put(indexSettings())
|
||||
.put("number_of_shards", randomIntBetween(2, 7))
|
||||
.put("number_of_replicas", 0)).get();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
client().prepareIndex("source", randomFrom("t1", "t2", "t3")).setSource("{\"foo\" : \"bar\", \"i\" : " + i + "}").get();
|
||||
}
|
||||
ImmutableOpenMap<String, DiscoveryNode> dataNodes = client().admin().cluster().prepareState().get().getState().nodes()
|
||||
.getDataNodes();
|
||||
assertTrue("at least 2 nodes but was: " + dataNodes.size(), dataNodes.size() >= 2);
|
||||
DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
|
||||
String spareNode = discoveryNodes[0].getName();
|
||||
String mergeNode = discoveryNodes[1].getName();
|
||||
// ensure all shards are allocated otherwise the ensure green below might not succeed since we require the merge node
|
||||
// if we change the setting too quickly we will end up with one replica unassigned which can't be assigned anymore due
|
||||
// to the require._name below.
|
||||
ensureGreen();
|
||||
// relocate all shards to one node such that we can merge it.
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder().put("index.routing.allocation.require._name", mergeNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen();
|
||||
|
||||
// now merge source into a single shard index
|
||||
client().admin().indices().prepareShrinkIndex("source", "target")
|
||||
.setWaitForActiveShards(ActiveShardCount.NONE)
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.exclude._name", mergeNode) // we manually exclude the merge node to forcefully fuck it up
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.allocation.max_retries", 1).build()).get();
|
||||
client().admin().cluster().prepareHealth("target").setWaitForEvents(Priority.LANGUID).get();
|
||||
|
||||
// now we move all shards away from the merge node
|
||||
client().admin().indices().prepareUpdateSettings("source")
|
||||
.setSettings(Settings.builder().put("index.routing.allocation.require._name", spareNode)
|
||||
.put("index.blocks.write", true)).get();
|
||||
ensureGreen("source");
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("target") // erase the forcefully fuckup!
|
||||
.setSettings(Settings.builder().putNull("index.routing.allocation.exclude._name")).get();
|
||||
// wait until it fails
|
||||
assertBusy(() -> {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
|
||||
RoutingTable routingTables = clusterStateResponse.getState().routingTable();
|
||||
assertTrue(routingTables.index("target").shard(0).getShards().get(0).unassigned());
|
||||
assertEquals(UnassignedInfo.Reason.ALLOCATION_FAILED,
|
||||
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getReason());
|
||||
assertEquals(1,
|
||||
routingTables.index("target").shard(0).getShards().get(0).unassignedInfo().getNumFailedAllocations());
|
||||
});
|
||||
client().admin().indices().prepareUpdateSettings("source") // now relocate them all to the right node
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.routing.allocation.require._name", mergeNode)).get();
|
||||
ensureGreen("source");
|
||||
|
||||
final InternalClusterInfoService infoService = (InternalClusterInfoService) internalCluster().getInstance(ClusterInfoService.class,
|
||||
internalCluster().getMasterName());
|
||||
infoService.refresh();
|
||||
// kick off a retry and wait until it's done!
|
||||
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setRetryFailed(true).get();
|
||||
long expectedShardSize = clusterRerouteResponse.getState().routingTable().index("target")
|
||||
.shard(0).getShards().get(0).getExpectedShardSize();
|
||||
// we support the expected shard size in the allocator to sum up over the source index shards
|
||||
assertTrue("expected shard size must be set but wasn't: " + expectedShardSize, expectedShardSize > 0);
|
||||
ensureGreen();
|
||||
assertHitCount(client().prepareSearch("target").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
|
||||
}
|
||||
}
|
|
@ -38,13 +38,16 @@ import org.elasticsearch.index.IndexNotFoundException;
|
|||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.indices.InvalidIndexNameException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.test.gateway.TestGatewayAllocator;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.min;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
|
||||
public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
||||
|
@ -150,11 +153,20 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
|||
|
||||
public void testShrinkIndexSettings() {
|
||||
String indexName = randomAsciiOfLength(10);
|
||||
List<Version> versions = Arrays.asList(VersionUtils.randomVersion(random()), VersionUtils.randomVersion(random()),
|
||||
VersionUtils.randomVersion(random()));
|
||||
versions.sort((l, r) -> Long.compare(l.id, r.id));
|
||||
Version version = versions.get(0);
|
||||
Version minCompat = versions.get(1);
|
||||
Version upgraded = versions.get(2);
|
||||
// create one that won't fail
|
||||
ClusterState clusterState = ClusterState.builder(createClusterState(indexName, randomIntBetween(2, 10), 0,
|
||||
Settings.builder()
|
||||
.put("index.blocks.write", true)
|
||||
.put("index.similarity.default.type", "BM25")
|
||||
.put("index.version.created", version)
|
||||
.put("index.version.upgraded", upgraded)
|
||||
.put("index.version.minimum_compatible", minCompat.luceneVersion)
|
||||
.put("index.analysis.analyzer.my_analyzer.tokenizer", "keyword")
|
||||
.build())).nodes(DiscoveryNodes.builder().add(newNode("node1")))
|
||||
.build();
|
||||
|
@ -177,6 +189,10 @@ public class MetaDataCreateIndexServiceTests extends ESTestCase {
|
|||
"keyword", builder.build().get("index.analysis.analyzer.my_analyzer.tokenizer"));
|
||||
assertEquals("node1", builder.build().get("index.routing.allocation.initial_recovery._id"));
|
||||
assertEquals("1", builder.build().get("index.allocation.max_retries"));
|
||||
assertEquals(version, builder.build().getAsVersion("index.version.created", null));
|
||||
assertEquals(upgraded, builder.build().getAsVersion("index.version.upgraded", null));
|
||||
assertEquals(minCompat.luceneVersion.toString(), builder.build().get("index.version.minimum_compatible", null));
|
||||
|
||||
}
|
||||
|
||||
private DiscoveryNode newNode(String nodeId) {
|
||||
|
|
Loading…
Reference in New Issue