diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java index 43c13087dd0..0a2830e55fc 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java @@ -91,6 +91,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Predicate; +import java.util.stream.IntStream; import static org.elasticsearch.action.support.ContextPreservingActionListener.wrapPreservingContext; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS; @@ -340,19 +341,44 @@ public class MetaDataCreateIndexService extends AbstractComponent { indexSettingsBuilder.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, request.getProvidedName()); indexSettingsBuilder.put(SETTING_INDEX_UUID, UUIDs.randomBase64UUID()); final Index shrinkFromIndex = request.shrinkFrom(); - int routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build());; - if (shrinkFromIndex != null) { - prepareShrinkIndexSettings(currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, - request.index()); - IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); + final IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()); + + final int routingNumShards; + if (shrinkFromIndex == null) { + routingNumShards = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexSettingsBuilder.build()); + } else { + final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); routingNumShards = sourceMetaData.getRoutingNumShards(); } + tmpImdBuilder.setRoutingNumShards(routingNumShards); + + if (shrinkFromIndex != null) { + prepareShrinkIndexSettings( + currentState, mappings.keySet(), indexSettingsBuilder, shrinkFromIndex, request.index()); + } + final Settings actualIndexSettings = indexSettingsBuilder.build(); + tmpImdBuilder.settings(actualIndexSettings); + + if (shrinkFromIndex != null) { + /* + * We need to arrange that the primary term on all the shards in the shrunken index is at least as large as + * the maximum primary term on all the shards in the source index. This ensures that we have correct + * document-level semantics regarding sequence numbers in the shrunken index. + */ + final IndexMetaData sourceMetaData = currentState.metaData().getIndexSafe(shrinkFromIndex); + final long primaryTerm = + IntStream + .range(0, sourceMetaData.getNumberOfShards()) + .mapToLong(sourceMetaData::primaryTerm) + .max() + .getAsLong(); + for (int shardId = 0; shardId < tmpImdBuilder.numberOfShards(); shardId++) { + tmpImdBuilder.primaryTerm(shardId, primaryTerm); + } + } - Settings actualIndexSettings = indexSettingsBuilder.build(); - IndexMetaData.Builder tmpImdBuilder = IndexMetaData.builder(request.index()) - .setRoutingNumShards(routingNumShards); // Set up everything, now locally create the index to see that things are ok, and apply - final IndexMetaData tmpImd = tmpImdBuilder.settings(actualIndexSettings).build(); + final IndexMetaData tmpImd = tmpImdBuilder.build(); ActiveShardCount waitForActiveShards = request.waitForActiveShards(); if (waitForActiveShards == ActiveShardCount.DEFAULT) { waitForActiveShards = tmpImd.getWaitForActiveShards(); @@ -408,6 +434,11 @@ public class MetaDataCreateIndexService extends AbstractComponent { final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()) .settings(actualIndexSettings) .setRoutingNumShards(routingNumShards); + + for (int shardId = 0; shardId < tmpImd.getNumberOfShards(); shardId++) { + indexMetaDataBuilder.primaryTerm(shardId, tmpImd.primaryTerm(shardId)); + } + for (MappingMetaData mappingMd : mappingsMetaData.values()) { indexMetaDataBuilder.putMapping(mappingMd); } diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java index 36c7da7894b..ec20817aa2a 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/create/ShrinkIndexIT.java @@ -19,30 +19,38 @@ package org.elasticsearch.action.admin.indices.create; +import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.SortedSetSelector; import org.apache.lucene.search.SortedSetSortField; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.indices.segments.IndexSegments; -import org.elasticsearch.action.admin.indices.segments.IndexShardSegments; -import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse; -import org.elasticsearch.action.admin.indices.segments.ShardSegments; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.InternalClusterInfoService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.Murmur3HashFunction; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.engine.Segment; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.query.TermsQueryBuilder; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalSettingsPlugin; @@ -50,10 +58,18 @@ import org.elasticsearch.test.VersionUtils; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; public class ShrinkIndexIT extends ESIntegTestCase { @@ -135,6 +151,81 @@ public class ShrinkIndexIT extends ESIntegTestCase { assertHitCount(client().prepareSearch("source").setSize(100).setQuery(new TermsQueryBuilder("foo", "bar")).get(), 20); } + public void testShrinkIndexPrimaryTerm() throws Exception { + final List factors = Arrays.asList(2, 3, 5, 7); + final List numberOfShardsFactors = randomSubsetOf(scaledRandomIntBetween(1, factors.size()), factors); + final int numberOfShards = numberOfShardsFactors.stream().reduce(1, (x, y) -> x * y); + final int numberOfTargetShards = randomSubsetOf(numberOfShardsFactors).stream().reduce(1, (x, y) -> x * y); + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("source").setSettings(Settings.builder().put(indexSettings()).put("number_of_shards", numberOfShards)).get(); + + final ImmutableOpenMap dataNodes = + client().admin().cluster().prepareState().get().getState().nodes().getDataNodes(); + assertThat(dataNodes.size(), greaterThanOrEqualTo(2)); + final DiscoveryNode[] discoveryNodes = dataNodes.values().toArray(DiscoveryNode.class); + final String mergeNode = discoveryNodes[0].getName(); + ensureGreen(); + + // fail random primary shards to force primary terms to increase + final Index source = resolveIndex("source"); + final int iterations = scaledRandomIntBetween(0, 16); + for (int i = 0; i < iterations; i++) { + final String node = randomSubsetOf(1, internalCluster().nodesInclude("source")).get(0); + final IndicesService indexServices = internalCluster().getInstance(IndicesService.class, node); + final IndexService indexShards = indexServices.indexServiceSafe(source); + for (final Integer shardId : indexShards.shardIds()) { + final IndexShard shard = indexShards.getShard(shardId); + if (shard.routingEntry().primary() && randomBoolean()) { + disableAllocation("source"); + shard.failShard("test", new Exception("test")); + // this can not succeed until the shard is failed and a replica is promoted + int id = 0; + while (true) { + // find an ID that routes to the right shard, we will only index to the shard that saw a primary failure + final String s = Integer.toString(id); + final int hash = Math.floorMod(Murmur3HashFunction.hash(s), numberOfShards); + if (hash == shardId) { + final IndexRequest request = + new IndexRequest("source", "type", s).source("{ \"f\": \"" + s + "\"}", XContentType.JSON); + client().index(request).get(); + break; + } else { + id++; + } + } + enableAllocation("source"); + ensureGreen(); + } + } + } + + // relocate all shards to one node such that we can merge it. + final Settings.Builder prepareShrinkSettings = + Settings.builder().put("index.routing.allocation.require._name", mergeNode).put("index.blocks.write", true); + client().admin().indices().prepareUpdateSettings("source").setSettings(prepareShrinkSettings).get(); + ensureGreen(); + + final IndexMetaData indexMetaData = indexMetaData(client(), "source"); + final long beforeShrinkPrimaryTerm = IntStream.range(0, numberOfShards).mapToLong(indexMetaData::primaryTerm).max().getAsLong(); + + // now merge source into target + final Settings shrinkSettings = + Settings.builder().put("index.number_of_replicas", 0).put("index.number_of_shards", numberOfTargetShards).build(); + assertAcked(client().admin().indices().prepareShrinkIndex("source", "target").setSettings(shrinkSettings).get()); + + ensureGreen(); + + final IndexMetaData afterShrinkIndexMetaData = indexMetaData(client(), "target"); + for (int shardId = 0; shardId < numberOfTargetShards; shardId++) { + assertThat(afterShrinkIndexMetaData.primaryTerm(shardId), equalTo(beforeShrinkPrimaryTerm + 1)); + } + } + + private static IndexMetaData indexMetaData(final Client client, final String index) { + final ClusterStateResponse clusterStateResponse = client.admin().cluster().state(new ClusterStateRequest()).actionGet(); + return clusterStateResponse.getState().metaData().index(index); + } + public void testCreateShrinkIndex() { internalCluster().ensureAtLeastNumDataNodes(2); Version version = VersionUtils.randomVersion(random());