Initialize primary term for shrunk indices

Today when an index is shrunk, the primary terms for its shards start
from one. Yet, this is a problem as the index will already contain
assigned sequence numbers across primary terms. To ensure document-level
sequence number semantics, the primary terms of the target shards must
start from the maximum of all the shards in the source index. This
commit causes this to be the case.

Relates #25307
This commit is contained in:
Jason Tedor 2017-06-20 15:12:39 -04:00 committed by GitHub
parent 50bac63210
commit 1f14d042f6
2 changed files with 136 additions and 14 deletions

View File

@ -91,6 +91,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS;
@ -340,19 +341,44 @@ public class MetaDataCreateIndexService extends AbstractComponent {
indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName());
indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID());
final Index shrinkFromIndex = request.shrinkFrom();
int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());;
if (shrinkFromIndex != null) {
prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex,
request.index());
IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index());
final int routingNumShards;
if (shrinkFromIndex == null) {
routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());
} else {
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
routingNumShards = sourceMetaData.getRoutingNumShards();
}
tmpImdBuilder.setRoutingNumShards(routingNumShards);
if (shrinkFromIndex != null) {
prepareShrinkIndexSettings(
currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index());
}
final Settings actualIndexSettings = indexSettingsBuilder.build();
tmpImdBuilder.settings(actualIndexSettings);
if (shrinkFromIndex != null) {
/*
* We need to arrange that the primary term on all the shards in the shrunken index is at least as large as
* the maximum primary term on all the shards in the source index. This ensures that we have correct
* document-level semantics regarding sequence numbers in the shrunken index.
*/
final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex);
final long primaryTerm =
IntStream
.range(0, sourceMetaData.getNumberOfShards())
.mapToLong(sourceMetaData::primaryTerm)
.max()
.getAsLong();
for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) {
tmpImdBuilder.primaryTerm(shardId, primaryTerm);
}
}
Settings actualIndexSettings = indexSettingsBuilder.build();
IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index())
.setRoutingNumShards(routingNumShards);
// Set up everything, now locally create the index to see that things are ok, and apply
final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build();
final IndexMetaData tmpImd = tmpImdBuilder.build();
ActiveShardCount waitForActiveShards = request.waitForActiveShards();
if (waitForActiveShards == ActiveShardCount.DEFAULT) {
waitForActiveShards = tmpImd.getWaitForActiveShards();
@ -408,6 +434,11 @@ public class MetaDataCreateIndexService extends AbstractComponent {
final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index())
.settings(actualIndexSettings)
.setRoutingNumShards(routingNumShards);
for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) {
indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId));
}
for (MappingMetaData mappingMd : mappingsMetaData.values()) {
indexMetaDataBuilder.putMapping(mappingMd);
}

View File

@ -19,30 +19,38 @@
package org.elasticsearch.action.admin.indices.create;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.SortedSetSelector;
import org.apache.lucene.search.SortedSetSortField;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.segments.IndexSegments;
import org.elasticsearch.action.admin.indices.segments.IndexShardSegments;
import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.segments.ShardSegments;
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.InternalClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.Murmur3HashFunction;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
@ -50,10 +58,18 @@ import org.elasticsearch.test.VersionUtils;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class ShrinkIndexIT extends ESIntegTestCase {
@ -135,6 +151,81 @@ public class ShrinkIndexIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20);
}
public void testShrinkIndexPrimaryTerm() throws Exception {
final List<Integer> factors = Arrays.asList(2, 3, 5, 7);
final List<Integer> numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size()), factors);
final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y);
final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y);
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get();
final ImmutableOpenMap<String, DiscoveryNode> dataNodes =
client().admin().cluster().prepareState().get().getState().nodes().getDataNodes();
assertThat(dataNodes.size(), greaterThanOrEqualTo(2));
final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class);
final String mergeNode = discoveryNodes[0].getName();
ensureGreen();
// fail random primary shards to force primary terms to increase
final Index source = resolveIndex("source");
final int iterations = scaledRandomIntBetween(0, 16);
for (int i = 0; i < iterations; i++) {
final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0);
final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node);
final IndexService indexShards = indexServices.indexServiceSafe(source);
for (final Integer shardId : indexShards.shardIds()) {
final IndexShard shard = indexShards.getShard(shardId);
if (shard.routingEntry().primary() && randomBoolean()) {
disableAllocation("source");
shard.failShard("test", new Exception("test"));
// this can not succeed until the shard is failed and a replica is promoted
int id = 0;
while (true) {
// find an ID that routes to the right shard, we will only index to the shard that saw a primary failure
final String s = Integer.toString(id);
final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards);
if (hash == shardId) {
final IndexRequest request =
new IndexRequest("source", "type", s).source("{ \"f\": \"" + s + "\"}", XContentType.JSON);
client().index(request).get();
break;
} else {
id++;
}
}
enableAllocation("source");
ensureGreen();
}
}
}
// relocate all shards to one node such that we can merge it.
final Settings.Builder prepareShrinkSettings =
Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true);
client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get();
ensureGreen();
final IndexMetaData indexMetaData = indexMetaData(client(), "source");
final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong();
// now merge source into target
final Settings shrinkSettings =
Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", numberOfTargetShards).build();
assertAcked(client().admin().indices().prepareShrinkIndex("source", "target").setSettings(shrinkSettings).get());
ensureGreen();
final IndexMetaData afterShrinkIndexMetaData = indexMetaData(client(), "target");
for (int shardId = 0; shardId < numberOfTargetShards; shardId++) {
assertThat(afterShrinkIndexMetaData.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1));
}
}
private static IndexMetaData indexMetaData(final Client client, final String index) {
final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet();
return clusterStateResponse.getState().metaData().index(index);
}
public void testCreateShrinkIndex() {
internalCluster().ensureAtLeastNumDataNodes(2);
Version version = VersionUtils.randomVersion(random());