From fe43eef1b592aefc1d695ce5af21f82f8e5cc529 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 10 Mar 2016 11:08:33 +0100 Subject: [PATCH] Port Primary Terms to master #17044 Primary terms is a way to make sure that operations replicated from stale primary are rejected by shards following a newly elected primary. Original PRs adding this to the seq# feature branch #14062 , #14651 . Unlike those PR, here we take a different approach (based on newer code in master) where the primary terms are stored in the meta data only (and not in `ShardRouting` objects). Relates to #17038 Closes #17044 --- .../replication/ReplicationRequest.java | 14 + .../TransportReplicationAction.java | 64 +++-- .../elasticsearch/cluster/ClusterState.java | 51 ++-- .../cluster/metadata/IndexMetaData.java | 161 +++++++++--- .../cluster/routing/RoutingTable.java | 4 - .../routing/allocation/AllocationService.java | 96 ++++--- .../routing/allocation/RoutingAllocation.java | 2 +- .../cluster/service/ClusterService.java | 5 +- .../org/elasticsearch/index/IndexService.java | 29 ++- .../IllegalIndexShardStateException.java | 8 +- .../elasticsearch/index/shard/IndexShard.java | 107 ++++++-- .../cluster/IndicesClusterStateService.java | 118 +++++---- .../TransportBroadcastByNodeActionTests.java | 10 + .../BroadcastReplicationTests.java | 2 +- .../ClusterStateCreationUtils.java | 18 +- .../TransportReplicationActionTests.java | 22 +- .../metadata/ToAndFromJsonMetaDataTests.java | 46 ++-- .../cluster/routing/PrimaryTermsTests.java | 241 ++++++++++++++++++ .../allocation/CatAllocationTestCase.java | 1 - .../PrimaryElectionRoutingTests.java | 38 +-- .../routing/allocation/ShardStateIT.java | 80 ++++++ .../gateway/RecoveryFromGatewayIT.java | 57 ++++- .../index/shard/IndexShardTests.java | 193 +++++++++++--- .../cluster/routing/TestShardRouting.java | 2 +- 24 files changed, 1063 insertions(+), 306 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java index 6283e69a02e..b4cfbb6ad88 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java @@ -51,6 +51,8 @@ public abstract class ReplicationRequest primaryResponse = shardOperationOnPrimary(state.metaData(), request); + primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm()); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); } @@ -825,17 +799,17 @@ public abstract class TransportReplicationAction * The cluster state object is immutable with an * exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable}, * and cluster state {@link #status}, which is updated during cluster state publishing and applying @@ -74,7 +74,7 @@ import java.util.Set; * the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish} * method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The * publishing mechanism can be overridden by other discovery. - * + *

* The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state * differences instead of the entire state on each change. The publishing mechanism should only send differences * to a node if this node was present in the previous version of the cluster state. If a node is not present was @@ -135,7 +135,7 @@ public class ClusterState implements ToXContent, Diffable { public static T lookupPrototypeSafe(String type) { @SuppressWarnings("unchecked") - T proto = (T)customPrototypes.get(type); + T proto = (T) customPrototypes.get(type); if (proto == null) { throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins"); } @@ -281,6 +281,16 @@ public class ClusterState implements ToXContent, Diffable { sb.append("state uuid: ").append(stateUUID).append("\n"); sb.append("from_diff: ").append(wasReadFromDiff).append("\n"); sb.append("meta data version: ").append(metaData.version()).append("\n"); + for (IndexMetaData indexMetaData : metaData) { + final String TAB = " "; + sb.append(TAB).append(indexMetaData.getIndex()); + sb.append(": v[").append(indexMetaData.getVersion()).append("]\n"); + for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) { + sb.append(TAB).append(TAB).append(shard).append(": "); + sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], "); + sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n"); + } + } sb.append(blocks().prettyPrint()); sb.append(nodes().prettyPrint()); sb.append(routingTable().prettyPrint()); @@ -477,6 +487,12 @@ public class ClusterState implements ToXContent, Diffable { } builder.endArray(); + builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS); + for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) { + builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard)); + } + builder.endObject(); + builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS); for (IntObjectCursor> cursor : indexMetaData.getActiveAllocationIds()) { builder.startArray(String.valueOf(cursor.key)); @@ -487,6 +503,7 @@ public class ClusterState implements ToXContent, Diffable { } builder.endObject(); + // index metadata builder.endObject(); } builder.endObject(); @@ -683,16 +700,16 @@ public class ClusterState implements ToXContent, Diffable { } /** - * @param data input bytes - * @param localNode used to set the local node in the cluster state. + * @param data input bytes + * @param localNode used to set the local node in the cluster state. */ public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException { return readFrom(StreamInput.wrap(data), localNode); } /** - * @param in input stream - * @param localNode used to set the local node in the cluster state. can be null. + * @param in input stream + * @param localNode used to set the local node in the cluster state. can be null. */ public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException { return PROTO.readFrom(in, localNode); @@ -791,17 +808,17 @@ public class ClusterState implements ToXContent, Diffable { metaData = proto.metaData.readDiffFrom(in); blocks = proto.blocks.readDiffFrom(in); customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), - new DiffableUtils.DiffableValueSerializer() { - @Override - public Custom read(StreamInput in, String key) throws IOException { - return lookupPrototypeSafe(key).readFrom(in); - } + new DiffableUtils.DiffableValueSerializer() { + @Override + public Custom read(StreamInput in, String key) throws IOException { + return lookupPrototypeSafe(key).readFrom(in); + } - @Override - public Diff readDiff(StreamInput in, String key) throws IOException { - return lookupPrototypeSafe(key).readDiffFrom(in); - } - }); + @Override + public Diff readDiff(StreamInput in, String key) throws IOException { + return lookupPrototypeSafe(key).readDiffFrom(in); + } + }); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java index 20ba36dd910..ca3c153e1d6 100644 --- a/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java +++ b/core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.metadata; +import com.carrotsearch.hppc.LongArrayList; import com.carrotsearch.hppc.cursors.IntObjectCursor; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; @@ -29,6 +30,8 @@ import org.elasticsearch.cluster.DiffableUtils; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNodeFilters; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseFieldMatcher; import org.elasticsearch.common.collect.ImmutableOpenIntMap; @@ -56,6 +59,7 @@ import org.joda.time.DateTimeZone; import java.io.IOException; import java.text.ParseException; +import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -217,6 +221,13 @@ public class IndexMetaData implements Diffable, FromXContentBuild .numberOfShards(1).numberOfReplicas(0).build(); public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations"; + static final String KEY_VERSION = "version"; + static final String KEY_SETTINGS = "settings"; + static final String KEY_STATE = "state"; + static final String KEY_MAPPINGS = "mappings"; + static final String KEY_ALIASES = "aliases"; + public static final String KEY_PRIMARY_TERMS = "primary_terms"; + public static final String INDEX_STATE_FILE_PREFIX = "state-"; private final int numberOfShards; @@ -224,6 +235,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild private final Index index; private final long version; + private final long[] primaryTerms; private final State state; @@ -247,7 +259,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild private final Version indexUpgradedVersion; private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion; - private IndexMetaData(Index index, long version, State state, int numberOfShards, int numberOfReplicas, Settings settings, + private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings, ImmutableOpenMap mappings, ImmutableOpenMap aliases, ImmutableOpenMap customs, ImmutableOpenIntMap> activeAllocationIds, DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters, @@ -255,6 +267,8 @@ public class IndexMetaData implements Diffable, FromXContentBuild this.index = index; this.version = version; + this.primaryTerms = primaryTerms; + assert primaryTerms.length == numberOfShards; this.state = state; this.numberOfShards = numberOfShards; this.numberOfReplicas = numberOfReplicas; @@ -296,6 +310,16 @@ public class IndexMetaData implements Diffable, FromXContentBuild return this.version; } + + /** + * The term of the current selected primary. This is a non-negative number incremented when + * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary + * See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}. + **/ + public long primaryTerm(int shardId) { + return this.primaryTerms[shardId]; + } + /** * Return the {@link Version} on which this index has been created. This * information is typically useful for backward compatibility. @@ -416,6 +440,10 @@ public class IndexMetaData implements Diffable, FromXContentBuild IndexMetaData that = (IndexMetaData) o; + if (version != that.version) { + return false; + } + if (!aliases.equals(that.aliases)) { return false; } @@ -434,6 +462,10 @@ public class IndexMetaData implements Diffable, FromXContentBuild if (!customs.equals(that.customs)) { return false; } + + if (Arrays.equals(primaryTerms, that.primaryTerms) == false) { + return false; + } if (!activeAllocationIds.equals(that.activeAllocationIds)) { return false; } @@ -443,14 +475,18 @@ public class IndexMetaData implements Diffable, FromXContentBuild @Override public int hashCode() { int result = index.hashCode(); + result = 31 * result + Long.hashCode(version); result = 31 * result + state.hashCode(); result = 31 * result + aliases.hashCode(); result = 31 * result + settings.hashCode(); result = 31 * result + mappings.hashCode(); + result = 31 * result + customs.hashCode(); + result = 31 * result + Arrays.hashCode(primaryTerms); result = 31 * result + activeAllocationIds.hashCode(); return result; } + @Override public Diff diff(IndexMetaData previousState) { return new IndexMetaDataDiff(previousState, this); @@ -476,6 +512,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild private final String index; private final long version; + private final long[] primaryTerms; private final State state; private final Settings settings; private final Diff> mappings; @@ -488,11 +525,12 @@ public class IndexMetaData implements Diffable, FromXContentBuild version = after.version; state = after.state; settings = after.settings; + primaryTerms = after.primaryTerms; mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer()); aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer()); customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer()); activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds, - DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); + DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance()); } public IndexMetaDataDiff(StreamInput in) throws IOException { @@ -500,22 +538,23 @@ public class IndexMetaData implements Diffable, FromXContentBuild version = in.readLong(); state = State.fromId(in.readByte()); settings = Settings.readSettingsFromStream(in); + primaryTerms = in.readVLongArray(); mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MappingMetaData.PROTO); aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), AliasMetaData.PROTO); customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), - new DiffableUtils.DiffableValueSerializer() { - @Override - public Custom read(StreamInput in, String key) throws IOException { - return lookupPrototypeSafe(key).readFrom(in); - } + new DiffableUtils.DiffableValueSerializer() { + @Override + public Custom read(StreamInput in, String key) throws IOException { + return lookupPrototypeSafe(key).readFrom(in); + } - @Override - public Diff readDiff(StreamInput in, String key) throws IOException { - return lookupPrototypeSafe(key).readDiffFrom(in); - } - }); + @Override + public Diff readDiff(StreamInput in, String key) throws IOException { + return lookupPrototypeSafe(key).readDiffFrom(in); + } + }); activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(), - DiffableUtils.StringSetValueSerializer.getInstance()); + DiffableUtils.StringSetValueSerializer.getInstance()); } @Override @@ -524,6 +563,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild out.writeLong(version); out.writeByte(state.id); Settings.writeSettingsToStream(settings, out); + out.writeVLongArray(primaryTerms); mappings.writeTo(out); aliases.writeTo(out); customs.writeTo(out); @@ -536,6 +576,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild builder.version(version); builder.state(state); builder.settings(settings); + builder.primaryTerms(primaryTerms); builder.mappings.putAll(mappings.apply(part.mappings)); builder.aliases.putAll(aliases.apply(part.aliases)); builder.customs.putAll(customs.apply(part.customs)); @@ -550,6 +591,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild builder.version(in.readLong()); builder.state(State.fromId(in.readByte())); builder.settings(readSettingsFromStream(in)); + builder.primaryTerms(in.readVLongArray()); int mappingsSize = in.readVInt(); for (int i = 0; i < mappingsSize; i++) { MappingMetaData mappingMd = MappingMetaData.PROTO.readFrom(in); @@ -581,6 +623,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild out.writeLong(version); out.writeByte(state.id()); writeSettingsToStream(settings, out); + out.writeVLongArray(primaryTerms); out.writeVInt(mappings.size()); for (ObjectCursor cursor : mappings.values()) { cursor.value.writeTo(out); @@ -614,6 +657,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild private String index; private State state = State.OPEN; private long version = 1; + private long[] primaryTerms = null; private Settings settings = Settings.Builder.EMPTY_SETTINGS; private final ImmutableOpenMap.Builder mappings; private final ImmutableOpenMap.Builder aliases; @@ -633,6 +677,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild this.state = indexMetaData.state; this.version = indexMetaData.version; this.settings = indexMetaData.getSettings(); + this.primaryTerms = indexMetaData.primaryTerms.clone(); this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings); this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases); this.customs = ImmutableOpenMap.builder(indexMetaData.customs); @@ -672,8 +717,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild } public Builder settings(Settings.Builder settings) { - this.settings = settings.build(); - return this; + return settings(settings.build()); } public Builder settings(Settings settings) { @@ -741,6 +785,42 @@ public class IndexMetaData implements Diffable, FromXContentBuild return this; } + /** + * returns the primary term for the given shard. + * See {@link IndexMetaData#primaryTerm(int)} for more information. + */ + public long primaryTerm(int shardId) { + if (primaryTerms == null) { + initializePrimaryTerms(); + } + return this.primaryTerms[shardId]; + } + + /** + * sets the primary term for the given shard. + * See {@link IndexMetaData#primaryTerm(int)} for more information. + */ + public Builder primaryTerm(int shardId, long primaryTerm) { + if (primaryTerms == null) { + initializePrimaryTerms(); + } + this.primaryTerms[shardId] = primaryTerm; + return this; + } + + private void primaryTerms(long[] primaryTerms) { + this.primaryTerms = primaryTerms.clone(); + } + + private void initializePrimaryTerms() { + assert primaryTerms == null; + if (numberOfShards() < 0) { + throw new IllegalStateException("you must set the number of shards before setting/reading primary terms"); + } + primaryTerms = new long[numberOfShards()]; + } + + public IndexMetaData build() { ImmutableOpenMap.Builder tmpAliases = aliases; Settings tmpSettings = settings; @@ -815,27 +895,34 @@ public class IndexMetaData implements Diffable, FromXContentBuild minimumCompatibleLuceneVersion = null; } + if (primaryTerms == null) { + initializePrimaryTerms(); + } else if (primaryTerms.length != numberOfShards) { + throw new IllegalStateException("primaryTerms length is [" + primaryTerms.length + + "] but should be equal to number of shards [" + numberOfShards() + "]"); + } + final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE); - return new IndexMetaData(new Index(index, uuid), version, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), - tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters, - indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion); + return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(), + tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters, + indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion); } public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException { builder.startObject(indexMetaData.getIndex().getName(), XContentBuilder.FieldCaseConversion.NONE); - builder.field("version", indexMetaData.getVersion()); - builder.field("state", indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH)); + builder.field(KEY_VERSION, indexMetaData.getVersion()); + builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH)); boolean binary = params.paramAsBoolean("binary", false); - builder.startObject("settings"); + builder.startObject(KEY_SETTINGS); for (Map.Entry entry : indexMetaData.getSettings().getAsMap().entrySet()) { builder.field(entry.getKey(), entry.getValue()); } builder.endObject(); - builder.startArray("mappings"); + builder.startArray(KEY_MAPPINGS); for (ObjectObjectCursor cursor : indexMetaData.getMappings()) { if (binary) { builder.value(cursor.value.source().compressed()); @@ -855,12 +942,18 @@ public class IndexMetaData implements Diffable, FromXContentBuild builder.endObject(); } - builder.startObject("aliases"); + builder.startObject(KEY_ALIASES); for (ObjectCursor cursor : indexMetaData.getAliases().values()) { AliasMetaData.Builder.toXContent(cursor.value, builder, params); } builder.endObject(); + builder.startArray(KEY_PRIMARY_TERMS); + for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { + builder.value(indexMetaData.primaryTerm(i)); + } + builder.endArray(); + builder.startObject(KEY_ACTIVE_ALLOCATIONS); for (IntObjectCursor> cursor : indexMetaData.activeAllocationIds) { builder.startArray(String.valueOf(cursor.key)); @@ -895,9 +988,9 @@ public class IndexMetaData implements Diffable, FromXContentBuild if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); } else if (token == XContentParser.Token.START_OBJECT) { - if ("settings".equals(currentFieldName)) { + if (KEY_SETTINGS.equals(currentFieldName)) { builder.settings(Settings.settingsBuilder().put(SettingsLoader.Helper.loadNestedFromMap(parser.mapOrdered()))); - } else if ("mappings".equals(currentFieldName)) { + } else if (KEY_MAPPINGS.equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); @@ -909,7 +1002,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild throw new IllegalArgumentException("Unexpected token: " + token); } } - } else if ("aliases".equals(currentFieldName)) { + } else if (KEY_ALIASES.equals(currentFieldName)) { while (parser.nextToken() != XContentParser.Token.END_OBJECT) { builder.putAlias(AliasMetaData.Builder.fromXContent(parser)); } @@ -949,7 +1042,7 @@ public class IndexMetaData implements Diffable, FromXContentBuild } } } else if (token == XContentParser.Token.START_ARRAY) { - if ("mappings".equals(currentFieldName)) { + if (KEY_MAPPINGS.equals(currentFieldName)) { while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) { builder.putMapping(new MappingMetaData(new CompressedXContent(parser.binaryValue()))); @@ -961,13 +1054,23 @@ public class IndexMetaData implements Diffable, FromXContentBuild } } } + } else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) { + LongArrayList list = new LongArrayList(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.VALUE_NUMBER) { + list.add(parser.longValue()); + } else { + throw new IllegalStateException("found a non-numeric value under [" + KEY_PRIMARY_TERMS + "]"); + } + } + builder.primaryTerms(list.toArray()); } else { throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName); } } else if (token.isValue()) { - if ("state".equals(currentFieldName)) { + if (KEY_STATE.equals(currentFieldName)) { builder.state(State.fromString(parser.text())); - } else if ("version".equals(currentFieldName)) { + } else if (KEY_VERSION.equals(currentFieldName)) { builder.version(parser.longValue()); } else { throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]"); diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index a34405c09e0..c27e0a9beb1 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -586,10 +586,6 @@ public class RoutingTable implements Iterable, Diffable indexRoutingTable : indicesRouting.values()) { - indicesRouting.put(indexRoutingTable.value.getIndex().getName(), indexRoutingTable.value); - } RoutingTable table = new RoutingTable(version, indicesRouting.build()); indicesRouting = null; return table; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index 54f9b6855a6..da0fea69c68 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.gateway.GatewayAllocator; +import org.elasticsearch.index.shard.ShardId; import java.util.ArrayList; import java.util.Collections; @@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent { if (withReroute) { reroute(allocation); } - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString()); logClusterHealthStateChange( @@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent { "shards started [" + startedShardsAsString + "] ..." ); return result; - } - - - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) { - return buildChangedResult(metaData, routingNodes, new RoutingExplanations()); } - protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) { - final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build(); - MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable); - return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations); + + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) { + return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations()); + + } + + protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes, + RoutingExplanations explanations) { + final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build(); + MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable); + return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations); } /** - * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. + * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically + * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on + * the changes made during this allocation. * - * @param currentMetaData {@link MetaData} object from before the routing table was changed. + * @param oldMetaData {@link MetaData} object from before the routing table was changed. + * @param oldRoutingTable {@link RoutingTable} from before the change. * @param newRoutingTable new {@link RoutingTable} created by the allocation change * @return adapted {@link MetaData}, potentially the original one if no change was needed. */ - static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) { - // make sure index meta data and routing tables are in sync w.r.t active allocation ids + static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) { MetaData.Builder metaDataBuilder = null; - for (IndexRoutingTable indexRoutingTable : newRoutingTable) { - final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex()); - if (indexMetaData == null) { - throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName()); + for (IndexRoutingTable newIndexTable : newRoutingTable) { + final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex()); + if (oldIndexMetaData == null) { + throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName()); } IndexMetaData.Builder indexMetaDataBuilder = null; - for (IndexShardRoutingTable shardRoutings : indexRoutingTable) { - Set activeAllocationIds = shardRoutings.activeShards().stream() + for (IndexShardRoutingTable newShardTable : newIndexTable) { + final ShardId shardId = newShardTable.shardId(); + + // update activeAllocationIds + Set activeAllocationIds = newShardTable.activeShards().stream() .map(ShardRouting::allocationId) .filter(Objects::nonNull) .map(AllocationId::getId) @@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent { // only update active allocation ids if there is an active shard if (activeAllocationIds.isEmpty() == false) { // get currently stored allocation ids - Set storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id()); + Set storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id()); if (activeAllocationIds.equals(storedAllocationIds) == false) { if (indexMetaDataBuilder == null) { - indexMetaDataBuilder = IndexMetaData.builder(indexMetaData); + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); } - - indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds); + indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds); } } + + // update primary terms + final ShardRouting newPrimary = newShardTable.primaryShard(); + if (newPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard(); + if (oldPrimary == null) { + throw new IllegalStateException("missing primary shard for " + newShardTable.shardId()); + } + // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not* + // update them when a primary relocates + if (newPrimary.unassigned() || + newPrimary.isSameAllocation(oldPrimary) || + // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to + // be initializing. However, when the target shard is activated, we still want the primary term to staty + // the same + (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) { + // do nothing + } else { + // incrementing the primary term + if (indexMetaDataBuilder == null) { + indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData); + } + indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1); + } } if (indexMetaDataBuilder != null) { if (metaDataBuilder == null) { - metaDataBuilder = MetaData.builder(currentMetaData); + metaDataBuilder = MetaData.builder(oldMetaData); } metaDataBuilder.put(indexMetaDataBuilder); } @@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent { if (metaDataBuilder != null) { return metaDataBuilder.build(); } else { - return currentMetaData; + return oldMetaData; } } @@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent { } gatewayAllocator.applyFailedShards(allocation); reroute(allocation); - final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString()); logClusterHealthStateChange( new ClusterStateHealth(clusterState), @@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent { // the assumption is that commands will move / act on shards (or fail through exceptions) // so, there will always be shard "movements", so no need to check on reroute reroute(allocation); - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent { return result; } + /** * Reroutes the routing table based on the live nodes. *

@@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent { if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } - RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes); + RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes); logClusterHealthStateChange( new ClusterStateHealth(clusterState), new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()), @@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent { boolean changed = false; for (ShardRouting routing : replicas) { changed |= applyFailedShard(allocation, routing, false, - new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", - null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); } return changed; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 4e6ba0fb5ad..536806c0830 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -44,7 +44,7 @@ import static java.util.Collections.unmodifiableSet; public class RoutingAllocation { /** - * this class is used to describe results of a {@link RoutingAllocation} + * this class is used to describe results of a {@link RoutingAllocation} */ public static class Result { diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index 54e8535b57e..8a38453eb4c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -685,9 +685,8 @@ public class ClusterService extends AbstractLifecycleComponent { warnAboutSlowTaskIfNeeded(executionTime, source); } catch (Throwable t) { TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS))); - logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}{}{}", t, executionTime, - newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.nodes().prettyPrint(), - newClusterState.routingTable().prettyPrint(), newClusterState.getRoutingNodes().prettyPrint()); + logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", t, executionTime, + newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.prettyPrint()); // TODO: do we want to call updateTask.onFailure here? } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index 62076701183..815f257a45d 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -19,18 +19,6 @@ package org.elasticsearch.index; -import java.io.Closeable; -import java.io.IOException; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.search.Query; @@ -82,6 +70,18 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.threadpool.ThreadPool; +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import static java.util.Collections.emptyMap; import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; @@ -621,6 +621,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC rescheduleFsyncTask(durability); } } + + // update primary terms + for (final IndexShard shard : this.shards.values()) { + shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id())); + } } private void rescheduleFsyncTask(Translog.Durability durability) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java index 626e72acf41..e632c0669f6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java @@ -33,12 +33,12 @@ public class IllegalIndexShardStateException extends ElasticsearchException { private final IndexShardState currentState; - public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg) { - this(shardId, currentState, msg, null); + public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Object... args) { + this(shardId, currentState, msg, null, args); } - public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex) { - super("CurrentState[" + currentState + "] " + msg, ex); + public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex, Object... args) { + super("CurrentState[" + currentState + "] " + msg, ex, args); setShard(shardId); this.currentState = currentState; } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index b667a1de689..5a764a12077 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; +import org.elasticsearch.common.logging.LoggerMessageFormat; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; @@ -144,13 +145,16 @@ public class IndexShard extends AbstractIndexShardComponent { private final TranslogConfig translogConfig; private final IndexEventListener indexEventListener; - /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this - * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents - * being indexed/deleted. */ + /** + * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this + * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents + * being indexed/deleted. + */ private final AtomicLong writingBytes = new AtomicLong(); protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; + protected volatile long primaryTerm; protected final AtomicReference currentEngineReference = new AtomicReference<>(); protected final EngineFactory engineFactory; @@ -236,13 +240,16 @@ public class IndexShard extends AbstractIndexShardComponent { this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); this.suspendableRefContainer = new SuspendableRefContainer(); this.searcherWrapper = indexSearcherWrapper; + this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id()); } public Store store() { return this.store; } - /** returns true if this shard supports indexing (i.e., write) operations. */ + /** + * returns true if this shard supports indexing (i.e., write) operations. + */ public boolean canIndex() { return true; } @@ -279,6 +286,30 @@ public class IndexShard extends AbstractIndexShardComponent { return this.shardFieldData; } + + /** + * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)} + */ + public long getPrimaryTerm() { + return this.primaryTerm; + } + + /** + * notifies the shard of an increase in the primary term + */ + public void updatePrimaryTerm(final long newTerm) { + synchronized (mutex) { + if (newTerm != primaryTerm) { + assert shardRouting.primary() == false : "a primary shard should never update it's term. shard: " + shardRouting + + " current term [" + primaryTerm + "] new term [" + newTerm + "]"; + assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]"; + primaryTerm = newTerm; + } + } + + + } + /** * Returns the latest cluster routing entry received with this shard. Might be null if the * shard was just created. @@ -297,12 +328,12 @@ public class IndexShard extends AbstractIndexShardComponent { * unless explicitly disabled. * * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted - * @throws IOException if shard state could not be persisted + * @throws IOException if shard state could not be persisted */ public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException { final ShardRouting currentRouting = this.shardRouting; if (!newRouting.shardId().equals(shardId())) { - throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]"); + throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + ""); } if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) { throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting); @@ -419,9 +450,7 @@ public class IndexShard extends AbstractIndexShardComponent { public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) { try { - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY); } catch (Throwable t) { verifyNotClosed(t); @@ -431,6 +460,7 @@ public class IndexShard extends AbstractIndexShardComponent { public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) { try { + verifyReplicationTarget(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA); } catch (Throwable t) { verifyNotClosed(t); @@ -474,9 +504,7 @@ public class IndexShard extends AbstractIndexShardComponent { } public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY); } @@ -515,7 +543,9 @@ public class IndexShard extends AbstractIndexShardComponent { return getEngine().get(get, this::acquireSearcher); } - /** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */ + /** + * Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. + */ public void refresh(String source) { verifyNotClosed(); if (canIndex()) { @@ -538,7 +568,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** Returns how many bytes we are currently moving from heap to disk */ + /** + * Returns how many bytes we are currently moving from heap to disk + */ public long getWritingBytes() { return writingBytes.get(); } @@ -940,6 +972,22 @@ public class IndexShard extends AbstractIndexShardComponent { } } + private void verifyPrimary() { + if (shardRouting.primary() == false) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalStateException("shard is not a primary " + shardRouting); + } + } + + private void verifyReplicationTarget() { + final IndexShardState state = state(); + if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalStateException("active primary shard cannot be a replication target before " + + " relocation hand off " + shardRouting + ", state is [" + state + "]"); + } + } + protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { @@ -969,7 +1017,9 @@ public class IndexShard extends AbstractIndexShardComponent { } } - /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */ + /** + * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed + */ public long getIndexBufferRAMBytesUsed() { Engine engine = getEngineOrNull(); if (engine == null) { @@ -986,8 +1036,10 @@ public class IndexShard extends AbstractIndexShardComponent { this.shardEventListener.delegates.add(onShardFailure); } - /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last - * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */ + /** + * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last + * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. + */ public void checkIdle(long inactiveTimeNS) { Engine engineOrNull = getEngineOrNull(); if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) { @@ -1132,11 +1184,12 @@ public class IndexShard extends AbstractIndexShardComponent { } } catch (Exception e) { handleRefreshException(e); - }; + } } /** * Should be called for each no-op update operation to increment relevant statistics. + * * @param type the doc type of the update */ public void noopUpdate(String type) { @@ -1336,14 +1389,22 @@ public class IndexShard extends AbstractIndexShardComponent { public Releasable acquirePrimaryOperationLock() { verifyNotClosed(); - if (shardRouting.primary() == false) { - throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); - } + verifyPrimary(); return suspendableRefContainer.acquireUninterruptibly(); } - public Releasable acquireReplicaOperationLock() { + /** + * acquires operation log. If the given primary term is lower then the one in {@link #shardRouting} + * an {@link IllegalArgumentException} is thrown. + */ + public Releasable acquireReplicaOperationLock(long opPrimaryTerm) { verifyNotClosed(); + verifyReplicationTarget(); + if (primaryTerm > opPrimaryTerm) { + // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException + throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])", + shardId, opPrimaryTerm, primaryTerm)); + } return suspendableRefContainer.acquireUninterruptibly(); } @@ -1447,7 +1508,7 @@ public class IndexShard extends AbstractIndexShardComponent { * Returns true iff one or more changes to the engine are not visible to via the current searcher. * Otherwise false. * - * @throws EngineClosedException if the engine is already closed + * @throws EngineClosedException if the engine is already closed * @throws AlreadyClosedException if the internal indexwriter in the engine is already closed */ public boolean isRefreshNeeded() { diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 46ead3fbf36..c786f57029d 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -19,7 +19,6 @@ package org.elasticsearch.indices.cluster; -import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; @@ -71,9 +70,11 @@ import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentMap; /** @@ -90,7 +91,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent hasAllocations = new HashSet<>(); + for (ShardRouting routing : event.state().getRoutingNodes().node(event.state().nodes().localNodeId())) { + hasAllocations.add(routing.index()); + } for (IndexService indexService : indicesService) { Index index = indexService.index(); - if (indexService.shardIds().isEmpty()) { + if (hasAllocations.contains(index) == false) { + assert indexService.shardIds().isEmpty() : + "no locally assigned shards, but index wasn't emptied by applyDeletedShards." + + " index " + index + ", shards: " + indexService.shardIds(); if (logger.isDebugEnabled()) { logger.debug("{} cleaning index (no shards allocated)", index); } @@ -218,16 +233,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent newShardAllocationIds = new HashSet<>(); for (IndexService indexService : indicesService) { Index index = indexService.index(); - IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(index); - if (indexMetaData == null) { - continue; - } + IndexMetaData indexMetaData = event.state().metaData().index(index); + assert indexMetaData != null : "local index doesn't have metadata, should have been cleaned up by applyDeletedIndices: " + index; // now, go over and delete shards that needs to get deleted - newShardIds.clear(); + newShardAllocationIds.clear(); for (ShardRouting shard : routingNode) { if (shard.index().equals(index)) { - newShardIds.add(shard.id()); + // use the allocation id and not object so we won't be influence by relocation targets + newShardAllocationIds.add(shard.allocationId().getId()); } } - for (Integer existingShardId : indexService.shardIds()) { - if (!newShardIds.contains(existingShardId)) { + for (IndexShard existingShard : indexService) { + if (newShardAllocationIds.contains(existingShard.routingEntry().allocationId().getId()) == false) { if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { if (logger.isDebugEnabled()) { - logger.debug("{}[{}] removing shard (index is closed)", index, existingShardId); + logger.debug("{} removing shard (index is closed)", existingShard.shardId()); } - indexService.removeShard(existingShardId, "removing shard (index is closed)"); + indexService.removeShard(existingShard.shardId().id(), "removing shard (index is closed)"); } else { // we can just remove the shard, without cleaning it locally, since we will clean it // when all shards are allocated in the IndicesStore if (logger.isDebugEnabled()) { - logger.debug("{}[{}] removing shard (not allocated)", index, existingShardId); + logger.debug("{} removing shard (not allocated)", existingShard.shardId()); } - indexService.removeShard(existingShardId, "removing shard (not allocated)"); + indexService.removeShard(existingShard.shardId().id(), "removing shard (not allocated)"); } } } @@ -312,7 +326,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent !status.sourceNode().equals(sourceNode))) { @@ -477,7 +488,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { try { @@ -634,7 +640,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent { final ShardId sId = indexShard.shardId(); diff --git a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index b166f5f45c3..5d17232735f 100644 --- a/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -34,7 +34,9 @@ import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; @@ -212,10 +214,12 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(new Index(index, "_na_")); int shardIndex = -1; + int totalIndexShards = 0; for (int i = 0; i < numberOfNodes; i++) { final DiscoveryNode node = newNode(i); discoBuilder = discoBuilder.put(node); int numberOfShards = randomIntBetween(1, 10); + totalIndexShards += numberOfShards; for (int j = 0; j < numberOfShards; j++) { final ShardId shardId = new ShardId(index, "_na_", ++shardIndex); ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.id(), true, ShardRoutingState.STARTED); @@ -228,6 +232,12 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { discoBuilder.masterNodeId(newNode(numberOfNodes - 1).id()); ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName(TEST_CLUSTER)); stateBuilder.nodes(discoBuilder); + final IndexMetaData.Builder indexMetaData = IndexMetaData.builder(index) + .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) + .numberOfReplicas(0) + .numberOfShards(totalIndexShards); + + stateBuilder.metaData(MetaData.builder().put(indexMetaData)); stateBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable.build()).build()); ClusterState clusterState = stateBuilder.build(); setState(clusterService, clusterState); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index 4125f02b956..9170ff2e5a6 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -142,7 +142,7 @@ public class BroadcastReplicationTests extends ESTestCase { public void testResultCombine() throws InterruptedException, ExecutionException, IOException { final String index = "test"; - int numShards = randomInt(3); + int numShards = 1 + randomInt(3); setState(clusterService, stateWithAssignedPrimariesAndOneReplica(index, numShards)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Future response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index))); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 6b38d35c63b..bfe29229062 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -45,6 +45,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; import static org.elasticsearch.test.ESTestCase.randomFrom; +import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomIntBetween; /** @@ -84,10 +85,11 @@ public class ClusterStateCreationUtils { } discoBuilder.localNodeId(newNode(0).id()); discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures + final int primaryTerm = randomInt(200); IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() .put(SETTING_VERSION_CREATED, Version.CURRENT) .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas) - .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); + .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build(); RoutingTable.Builder routing = new RoutingTable.Builder(); routing.addAsNew(indexMetaData); @@ -111,7 +113,8 @@ public class ClusterStateCreationUtils { } else { unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null); } - indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, unassignedInfo)); + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, + primaryState, unassignedInfo)); for (ShardRoutingState replicaState : replicaStates) { String replicaNode = null; @@ -152,7 +155,7 @@ public class ClusterStateCreationUtils { discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder() .put(SETTING_VERSION_CREATED, Version.CURRENT) - .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1) .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build(); ClusterState.Builder state = ClusterState.builder(new ClusterName("test")); state.nodes(discoBuilder); @@ -163,8 +166,10 @@ public class ClusterStateCreationUtils { routing.addAsNew(indexMetaData); final ShardId shardId = new ShardId(index, "_na_", i); IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId); - indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true, ShardRoutingState.STARTED, null)); - indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, ShardRoutingState.STARTED, null)); + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true, + ShardRoutingState.STARTED, null)); + indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, + ShardRoutingState.STARTED, null)); indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build()); } state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build()); @@ -229,12 +234,13 @@ public class ClusterStateCreationUtils { /** * Creates a cluster state where local node and master node can be specified + * * @param localNode node in allNodes that is the local node * @param masterNode node in allNodes that is the master node. Can be null if no master exists * @param allNodes all nodes in the cluster * @return cluster state */ - public static ClusterState state(DiscoveryNode localNode, DiscoveryNode masterNode, DiscoveryNode... allNodes) { + public static ClusterState state(DiscoveryNode localNode, DiscoveryNode masterNode, DiscoveryNode... allNodes) { DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder(); for (DiscoveryNode node : allNodes) { discoBuilder.put(node); diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 1fc94dcb533..446ad74e8b5 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -630,11 +630,13 @@ public class TransportReplicationActionTests extends ESTestCase { final ShardIterator shardIt = shardRoutingTable.shardsIt(); final ShardId shardId = shardIt.shardId(); final Request request = new Request(shardId); + final long primaryTerm = randomInt(200); + request.primaryTerm(primaryTerm); final PlainActionFuture listener = new PlainActionFuture<>(); ReplicationTask task = maybeTask(); logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint()); - TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter(); + TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter(0); ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); indexShardRouting.set(primaryShard); @@ -767,6 +769,9 @@ public class TransportReplicationActionTests extends ESTestCase { } // all replicas have responded so the counter should be decreased again assertIndexShardCounter(1); + + // assert that nothing in the replica logic changes the primary term of the operation + assertThat(request.primaryTerm(), equalTo(primaryTerm)); } public void testCounterOnPrimary() throws Exception { @@ -989,7 +994,7 @@ public class TransportReplicationActionTests extends ESTestCase { /** * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. */ - private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() { + private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter(long primaryTerm) { count.incrementAndGet(); return new TransportReplicationAction.IndexShardReference() { @Override @@ -1009,6 +1014,11 @@ public class TransportReplicationActionTests extends ESTestCase { return shardRouting; } + @Override + public long opPrimaryTerm() { + return primaryTerm; + } + @Override public void close() { count.decrementAndGet(); @@ -1104,13 +1114,15 @@ public class TransportReplicationActionTests extends ESTestCase { return false; } + @Override protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { - return getOrCreateIndexShardOperationsCounter(); + final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex()); + return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id())); } - protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { - return getOrCreateIndexShardOperationsCounter(); + protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long opPrimaryTerm) { + return getOrCreateIndexShardOperationsCounter(opPrimaryTerm); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java index 58861585066..f7e8b18196f 100644 --- a/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java @@ -41,11 +41,14 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .put(IndexMetaData.builder("test1") .settings(settings(Version.CURRENT)) .numberOfShards(1) - .numberOfReplicas(2)) + .numberOfReplicas(2) + .primaryTerm(0, 1)) .put(IndexMetaData.builder("test2") .settings(settings(Version.CURRENT).put("setting1", "value1").put("setting2", "value2")) .numberOfShards(2) - .numberOfReplicas(3)) + .numberOfReplicas(3) + .primaryTerm(0, 2) + .primaryTerm(1, 2)) .put(IndexMetaData.builder("test3") .settings(settings(Version.CURRENT)) .numberOfShards(1) @@ -112,15 +115,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1)) .putAlias(newAliasMetaDataBuilder("alias2")) .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2))) - .put(IndexTemplateMetaData.builder("foo") - .template("bar") - .order(1) - .settings(settingsBuilder() - .put("setting1", "value1") - .put("setting2", "value2")) - .putAlias(newAliasMetaDataBuilder("alias-bar1")) - .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) - .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(IndexTemplateMetaData.builder("foo") + .template("bar") + .order(1) + .settings(settingsBuilder() + .put("setting1", "value1") + .put("setting2", "value2")) + .putAlias(newAliasMetaDataBuilder("alias-bar1")) + .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) + .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) .put(IndexMetaData.builder("test12") .settings(settings(Version.CURRENT) .put("setting1", "value1") @@ -133,15 +136,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1)) .putAlias(newAliasMetaDataBuilder("alias2")) .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2))) - .put(IndexTemplateMetaData.builder("foo") - .template("bar") - .order(1) - .settings(settingsBuilder() - .put("setting1", "value1") - .put("setting2", "value2")) - .putAlias(newAliasMetaDataBuilder("alias-bar1")) - .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) - .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) + .put(IndexTemplateMetaData.builder("foo") + .template("bar") + .order(1) + .settings(settingsBuilder() + .put("setting1", "value1") + .put("setting2", "value2")) + .putAlias(newAliasMetaDataBuilder("alias-bar1")) + .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}")) + .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar"))) .build(); String metaDataSource = MetaData.Builder.toXContent(metaData); @@ -150,6 +153,7 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { MetaData parsedMetaData = MetaData.Builder.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(metaDataSource)); IndexMetaData indexMetaData = parsedMetaData.index("test1"); + assertThat(indexMetaData.primaryTerm(0), equalTo(1L)); assertThat(indexMetaData.getNumberOfShards(), equalTo(1)); assertThat(indexMetaData.getNumberOfReplicas(), equalTo(2)); assertThat(indexMetaData.getCreationDate(), equalTo(-1L)); @@ -159,6 +163,8 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase { indexMetaData = parsedMetaData.index("test2"); assertThat(indexMetaData.getNumberOfShards(), equalTo(2)); assertThat(indexMetaData.getNumberOfReplicas(), equalTo(3)); + assertThat(indexMetaData.primaryTerm(0), equalTo(2L)); + assertThat(indexMetaData.primaryTerm(1), equalTo(2L)); assertThat(indexMetaData.getCreationDate(), equalTo(-1L)); assertThat(indexMetaData.getSettings().getAsMap().size(), equalTo(5)); assertThat(indexMetaData.getSettings().get("setting1"), equalTo("value1")); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java new file mode 100644 index 00000000000..d9b74621cc5 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java @@ -0,0 +1,241 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterStateHealth; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.node.DiscoveryNodes.Builder; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESAllocationTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; +import static org.elasticsearch.common.settings.Settings.settingsBuilder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; + +public class PrimaryTermsTests extends ESAllocationTestCase { + + private static final String TEST_INDEX_1 = "test1"; + private static final String TEST_INDEX_2 = "test2"; + private RoutingTable testRoutingTable; + private int numberOfShards; + private int numberOfReplicas; + private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build(); + private AllocationService allocationService; + private ClusterState clusterState; + + private final Map primaryTermsPerIndex = new HashMap<>(); + + @Override + public void setUp() throws Exception { + super.setUp(); + this.allocationService = createAllocationService(settingsBuilder() + .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries + .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) + .build()); + this.numberOfShards = randomIntBetween(1, 5); + this.numberOfReplicas = randomIntBetween(1, 5); + logger.info("Setup test with " + this.numberOfShards + " shards and " + this.numberOfReplicas + " replicas."); + this.primaryTermsPerIndex.clear(); + MetaData metaData = MetaData.builder() + .put(createIndexMetaData(TEST_INDEX_1)) + .put(createIndexMetaData(TEST_INDEX_2)) + .build(); + + this.testRoutingTable = new RoutingTable.Builder() + .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_1).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_1)) + .build()) + .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_2).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_2)) + .build()) + .build(); + + this.clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData) + .routingTable(testRoutingTable).build(); + } + + /** + * puts primary shard routings into initializing state + */ + private void initPrimaries() { + logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting"); + Builder discoBuilder = DiscoveryNodes.builder(); + for (int i = 0; i < this.numberOfReplicas + 1; i++) { + discoBuilder = discoBuilder.put(newNode("node" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build(); + RoutingAllocation.Result rerouteResult = allocationService.reroute(clusterState, "reroute"); + this.testRoutingTable = rerouteResult.routingTable(); + assertThat(rerouteResult.changed(), is(true)); + applyRerouteResult(rerouteResult); + primaryTermsPerIndex.keySet().forEach(this::incrementPrimaryTerm); + } + + private void incrementPrimaryTerm(String index) { + final long[] primaryTerms = primaryTermsPerIndex.get(index); + for (int i = 0; i < primaryTerms.length; i++) { + primaryTerms[i]++; + } + } + + private void incrementPrimaryTerm(String index, int shard) { + primaryTermsPerIndex.get(index)[shard]++; + } + + private boolean startInitializingShards(String index) { + this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build(); + final List startedShards = this.clusterState.getRoutingNodes().shardsWithState(index, INITIALIZING); + logger.info("start primary shards for index [{}]: {} ", index, startedShards); + RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(this.clusterState, startedShards); + applyRerouteResult(rerouteResult); + return rerouteResult.changed(); + } + + private void applyRerouteResult(RoutingAllocation.Result rerouteResult) { + ClusterState previousClusterState = this.clusterState; + ClusterState newClusterState = ClusterState.builder(previousClusterState).routingResult(rerouteResult).build(); + ClusterState.Builder builder = ClusterState.builder(newClusterState).incrementVersion(); + if (previousClusterState.routingTable() != newClusterState.routingTable()) { + builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1) + .build()); + } + if (previousClusterState.metaData() != newClusterState.metaData()) { + builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1)); + } + this.clusterState = builder.build(); + this.testRoutingTable = rerouteResult.routingTable(); + final ClusterStateHealth clusterHealth = new ClusterStateHealth(clusterState); + logger.info("applied reroute. active shards: p [{}], t [{}], init shards: [{}], relocating: [{}]", + clusterHealth.getActivePrimaryShards(), clusterHealth.getActiveShards(), + clusterHealth.getInitializingShards(), clusterHealth.getRelocatingShards()); + } + + private void failSomePrimaries(String index) { + this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build(); + final IndexRoutingTable indexShardRoutingTable = testRoutingTable.index(index); + Set shardIdsToFail = new HashSet<>(); + for (int i = 1 + randomInt(numberOfShards - 1); i > 0; i--) { + shardIdsToFail.add(randomInt(numberOfShards - 1)); + } + logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index); + List failedShards = new ArrayList<>(); + for (int shard : shardIdsToFail) { + failedShards.add(new FailedRerouteAllocation.FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null)); + incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term; + } + RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards); + applyRerouteResult(rerouteResult); + } + + private void addNodes() { + DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes()); + final int newNodes = randomInt(10); + logger.info("adding [{}] nodes", newNodes); + for (int i = 0; i < newNodes; i++) { + nodesBuilder.put(newNode("extra_" + i)); + } + this.clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build(); + RoutingAllocation.Result rerouteResult = allocationService.reroute(this.clusterState, "nodes added"); + applyRerouteResult(rerouteResult); + + } + + private IndexMetaData.Builder createIndexMetaData(String indexName) { + primaryTermsPerIndex.put(indexName, new long[numberOfShards]); + final IndexMetaData.Builder builder = new IndexMetaData.Builder(indexName) + .settings(DEFAULT_SETTINGS) + .numberOfReplicas(this.numberOfReplicas) + .numberOfShards(this.numberOfShards); + for (int i = 0; i < numberOfShards; i++) { + builder.primaryTerm(i, randomInt(200)); + primaryTermsPerIndex.get(indexName)[i] = builder.primaryTerm(i); + } + return builder; + } + + private void assertAllPrimaryTerm() { + primaryTermsPerIndex.keySet().forEach(this::assertPrimaryTerm); + } + + private void assertPrimaryTerm(String index) { + final long[] terms = primaryTermsPerIndex.get(index); + final IndexMetaData indexMetaData = clusterState.metaData().index(index); + for (IndexShardRoutingTable shardRoutingTable : this.testRoutingTable.index(index)) { + final int shard = shardRoutingTable.shardId().id(); + assertThat("primary term mismatch between indexMetaData of [" + index + "] and shard [" + shard + "]'s routing", + indexMetaData.primaryTerm(shard), equalTo(terms[shard])); + } + } + + public void testPrimaryTermMetaDataSync() { + assertAllPrimaryTerm(); + + initPrimaries(); + assertAllPrimaryTerm(); + + startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + + startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + + // now start all replicas too + startInitializingShards(TEST_INDEX_1); + startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + + // relocations shouldn't change much + addNodes(); + assertAllPrimaryTerm(); + boolean changed = true; + while (changed) { + changed = startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + changed |= startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + } + + // primary promotion + failSomePrimaries(TEST_INDEX_1); + assertAllPrimaryTerm(); + + // stablize cluster + changed = true; + while (changed) { + changed = startInitializingShards(TEST_INDEX_1); + assertAllPrimaryTerm(); + changed |= startInitializingShards(TEST_INDEX_2); + assertAllPrimaryTerm(); + } + } +} diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java index be403510195..0bd84413125 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java @@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESAllocationTestCase; import java.io.BufferedReader; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java index 7e59ab8a6b4..b18ee32ff5e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java @@ -58,29 +58,31 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build(); logger.info("Adding two nodes and performing rerouting"); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); - RoutingTable prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState, "reroute").routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build(); + RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); + + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build(); + result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Start the primary shard (on node1)"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + result = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Start the backup shard (on node2)"); routingNodes = clusterState.getRoutingNodes(); - prevRoutingTable = routingTable; - routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + result = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); logger.info("Adding third node and reroute and kill first node"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3")).remove("node1")).build(); - prevRoutingTable = routingTable; - routingTable = strategy.reroute(clusterState, "reroute").routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); + RoutingTable prevRoutingTable = clusterState.routingTable(); + result = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState).routingResult(result).build(); routingNodes = clusterState.getRoutingNodes(); + routingTable = clusterState.routingTable(); assertThat(prevRoutingTable != routingTable, equalTo(true)); assertThat(routingTable.index("test").shards().size(), equalTo(1)); @@ -89,6 +91,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1)); // verify where the primary is assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2")); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3")); } @@ -110,16 +113,18 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { logger.info("Adding two nodes and performing rerouting"); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute"); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); logger.info("Start the primary shards"); RoutingNodes routingNodes = clusterState.getRoutingNodes(); rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); routingNodes = clusterState.getRoutingNodes(); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2)); assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(1L)); + assertThat(clusterState.metaData().index("test").primaryTerm(1), equalTo(1L)); // now, fail one node, while the replica is initializing, and it also holds a primary logger.info("--> fail node with primary"); @@ -129,12 +134,13 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase { .put(newNode(nodeIdRemaining)) ).build(); rerouteResult = allocation.reroute(clusterState, "reroute"); - clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build(); + clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build(); routingNodes = clusterState.getRoutingNodes(); assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1)); assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true)); + assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L)); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java new file mode 100644 index 00000000000..38c15750423 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.index.IndexService; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.test.ESIntegTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class ShardStateIT extends ESIntegTestCase { + + public void testPrimaryFailureIncreasesTerm() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get(); + ensureGreen(); + assertPrimaryTerms(1, 1); + + logger.info("--> disabling allocation to capture shard failure"); + disableAllocation("test"); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + final int shard = randomBoolean() ? 0 : 1; + final String nodeId = state.routingTable().index("test").shard(shard).primaryShard().currentNodeId(); + final String node = state.nodes().get(nodeId).name(); + logger.info("--> failing primary of [{}] on node [{}]", shard, node); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null); + + logger.info("--> waiting for a yellow index"); + assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW))); + + final long term0 = shard == 0 ? 2 : 1; + final long term1 = shard == 1 ? 2 : 1; + assertPrimaryTerms(term0, term1); + + logger.info("--> enabling allocation"); + enableAllocation("test"); + ensureGreen(); + assertPrimaryTerms(term0, term1); + } + + protected void assertPrimaryTerms(long term0, long term1) { + for (String node : internalCluster().getNodeNames()) { + logger.debug("--> asserting primary terms terms on [{}]", node); + ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState(); + IndexMetaData metaData = state.metaData().index("test"); + assertThat(metaData.primaryTerm(0), equalTo(term0)); + assertThat(metaData.primaryTerm(1), equalTo(term1)); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node); + IndexService indexService = indicesService.indexService(metaData.getIndex()); + if (indexService != null) { + for (IndexShard shard : indexService) { + assertThat("term mismatch for shard " + shard.shardId(), + shard.getPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id()))); + } + } + } + } +} diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 4da9c2df177..dac284ee594 100644 --- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -19,11 +19,13 @@ package org.elasticsearch.gateway; +import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.settings.Settings; @@ -37,11 +39,13 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster.RestartCallback; -import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -88,10 +92,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a // shard that is still in post recovery when we restart and the ensureYellow() below will timeout + + Map primaryTerms = assertAndCapturePrimaryTerms(null); internalCluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); client().admin().indices().prepareRefresh().execute().actionGet(); assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); @@ -100,11 +107,37 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); client().admin().indices().prepareRefresh().execute().actionGet(); assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2); } + private Map assertAndCapturePrimaryTerms(Map previousTerms) { + if (previousTerms == null) { + previousTerms = new HashMap<>(); + } + final Map result = new HashMap<>(); + final ClusterState state = client().admin().cluster().prepareState().get().getState(); + for (ObjectCursor cursor : state.metaData().indices().values()) { + final IndexMetaData indexMetaData = cursor.value; + final String index = indexMetaData.getIndex().getName(); + final long[] previous = previousTerms.get(index); + final long[] current = IntStream.range(0, indexMetaData.getNumberOfShards()).mapToLong(indexMetaData::primaryTerm).toArray(); + if (previous == null) { + result.put(index, current); + } else { + assertThat("number of terms changed for index [" + index + "]", current.length, equalTo(previous.length)); + for (int shard = 0; shard < current.length; shard++) { + assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard], greaterThan(previous[shard])); + } + result.put(index, current); + } + } + + return result; + } + public void testSingleNodeNoFlush() throws Exception { internalCluster().startNode(); @@ -163,10 +196,14 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("Ensure all primaries have been started"); ensureYellow(); } + + Map primaryTerms = assertAndCapturePrimaryTerms(null); + internalCluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); for (int i = 0; i <= randomInt(10); i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), value1Docs + value2Docs); @@ -180,6 +217,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); for (int i = 0; i <= randomInt(10); i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), value1Docs + value2Docs); @@ -201,10 +239,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a // shard that is still in post recovery when we restart and the ensureYellow() below will timeout + Map primaryTerms = assertAndCapturePrimaryTerms(null); + internalCluster().fullRestart(); logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); for (int i = 0; i < 10; i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2); @@ -214,6 +255,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("Running Cluster Health (wait for the shards to startup)"); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); for (int i = 0; i < 10; i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2); @@ -236,6 +278,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2); } + Map primaryTerms = assertAndCapturePrimaryTerms(null); + internalCluster().fullRestart(new RestartCallback() { @Override public Settings onNodeStopped(String nodeName) throws Exception { @@ -251,6 +295,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("Running Cluster Health (wait for the shards to startup)"); ensureGreen(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); for (int i = 0; i < 10; i++) { assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2); @@ -276,6 +321,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(); assertThat(metaDataUuid, not(equalTo("_na_"))); + Map primaryTerms = assertAndCapturePrimaryTerms(null); + logger.info("--> closing first node, and indexing more data to the second node"); internalCluster().fullRestart(new RestartCallback() { @@ -315,6 +362,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { logger.info("--> running cluster_health (wait for the shards to startup)"); ensureGreen(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid)); @@ -386,11 +434,15 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { .setTransientSettings(settingsBuilder() .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE)) .get(); + + Map primaryTerms = assertAndCapturePrimaryTerms(null); + logger.info("--> full cluster restart"); internalCluster().fullRestart(); logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second "); ensureGreen(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); if (useSyncIds) { assertSyncIdsNotNull(); @@ -445,6 +497,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { internalCluster().startNode(settingsBuilder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build()); ensureGreen(); + Map primaryTerms = assertAndCapturePrimaryTerms(null); + internalCluster().fullRestart(new RestartCallback() { @@ -455,6 +509,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase { }); ensureYellow(); + primaryTerms = assertAndCapturePrimaryTerms(primaryTerms); assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true)); assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 72f58f104e6..1839df5dd30 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -37,18 +37,22 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndexStats; import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.InternalClusterInfoService; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.RestoreSource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.service.ClusterService; @@ -59,7 +63,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -119,6 +122,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex; import static org.elasticsearch.common.settings.Settings.settingsBuilder; import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; @@ -127,6 +131,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -168,6 +173,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { createIndex("test"); ensureGreen(); NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class); + ClusterService cs = getInstanceFromNode(ClusterService.class); final Index index = cs.state().metaData().index("test").getIndex(); Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0)); @@ -295,31 +301,133 @@ public class IndexShardTests extends ESSingleNodeTestCase { // expected } try { - indexShard.acquireReplicaOperationLock(); + indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm()); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException { + public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); IndexShard indexShard = indexService.getShardOrNull(0); + long primaryTerm = indexShard.getPrimaryTerm(); + + ShardRouting temp = indexShard.routingEntry(); + final ShardRouting newPrimaryShardRouting; + if (randomBoolean()) { + // relocation target + newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "other node", + true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(temp.allocationId())); + } else if (randomBoolean()) { + // simulate promotion + ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + false, ShardRoutingState.STARTED, temp.allocationId()); + indexShard.updateRoutingEntry(newReplicaShardRouting, false); + primaryTerm = primaryTerm + 1; + indexShard.updatePrimaryTerm(primaryTerm); + newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + true, ShardRoutingState.STARTED, temp.allocationId()); + } else { + newPrimaryShardRouting = temp; + } + indexShard.updateRoutingEntry(newPrimaryShardRouting, false); + assertEquals(0, indexShard.getActiveOperationsCount()); + if (newPrimaryShardRouting.isRelocationTarget() == false) { + try { + indexShard.acquireReplicaOperationLock(primaryTerm); + fail("shard shouldn't accept operations as replica"); + } catch (IllegalStateException ignored) { + + } + } Releasable operation1 = indexShard.acquirePrimaryOperationLock(); assertEquals(1, indexShard.getActiveOperationsCount()); Releasable operation2 = indexShard.acquirePrimaryOperationLock(); assertEquals(2, indexShard.getActiveOperationsCount()); + + Releasables.close(operation1, operation2); + assertEquals(0, indexShard.getActiveOperationsCount()); + } + + public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException { + assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); + ensureGreen("test"); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test")); + IndexShard indexShard = indexService.getShardOrNull(0); + long primaryTerm = indexShard.getPrimaryTerm(); + + // ugly hack to allow the shard to operated as a replica + final ShardRouting temp = indexShard.routingEntry(); + final ShardRouting newShardRouting; + switch (randomInt(2)) { + case 0: + // started replica + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null, + false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId())); + + indexShard.updateRoutingEntry(newShardRouting, false); + break; + case 1: + // initializing replica / primary + final boolean relocating = randomBoolean(); + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), + relocating ? "sourceNode" : null, + relocating ? randomBoolean() : false, + ShardRoutingState.INITIALIZING, + relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId()); + indexShard.updateRoutingEntry(newShardRouting, false); + break; + case 2: + // relocation source + newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "otherNode", + false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId())); + indexShard.updateRoutingEntry(newShardRouting, false); + indexShard.relocated("test"); + break; + default: + throw new UnsupportedOperationException("get your numbers straight"); + + } + logger.info("updated shard routing to {}", newShardRouting); + + assertEquals(0, indexShard.getActiveOperationsCount()); + if (newShardRouting.primary() == false) { + try { + indexShard.acquirePrimaryOperationLock(); + fail("shard shouldn't accept primary ops"); + } catch (IllegalStateException ignored) { + + } + } + + Releasable operation1 = indexShard.acquireReplicaOperationLock(primaryTerm); + assertEquals(1, indexShard.getActiveOperationsCount()); + Releasable operation2 = indexShard.acquireReplicaOperationLock(primaryTerm); + assertEquals(2, indexShard.getActiveOperationsCount()); + + try { + indexShard.acquireReplicaOperationLock(primaryTerm - 1); + fail("you can not increment the operation counter with an older primary term"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("operation term")); + assertThat(e.getMessage(), containsString("too old")); + } + + // but you can increment with a newer one.. + indexShard.acquireReplicaOperationLock(primaryTerm + 1 + randomInt(20)).close(); Releasables.close(operation1, operation2); assertEquals(0, indexShard.getActiveOperationsCount()); } public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -364,14 +472,14 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertTrue(shard.getEngine().getTranslog().syncNeeded()); setDurability(shard, Translog.Durability.REQUEST); assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "3").setSource("{}")) - .add(client().prepareDelete("test", "bar", "1")).get()); + .add(client().prepareIndex("test", "bar", "3").setSource("{}")) + .add(client().prepareDelete("test", "bar", "1")).get()); assertFalse(shard.getEngine().getTranslog().syncNeeded()); setDurability(shard, Translog.Durability.ASYNC); assertNoFailures(client().prepareBulk() - .add(client().prepareIndex("test", "bar", "4").setSource("{}")) - .add(client().prepareDelete("test", "bar", "3")).get()); + .add(client().prepareIndex("test", "bar", "4").setSource("{}")) + .add(client().prepareDelete("test", "bar", "3")).get()); setDurability(shard, Translog.Durability.REQUEST); assertTrue(shard.getEngine().getTranslog().syncNeeded()); } @@ -384,7 +492,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testMinimumCompatVersion() { Version versionCreated = VersionUtils.randomVersion(random()); assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id)); client().prepareIndex("test", "test").setSource("{}").get(); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -398,7 +506,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testUpdatePriority() { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); + .setSettings(IndexMetaData.SETTING_PRIORITY, 200)); IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test")); assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue()); client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get(); @@ -434,8 +542,8 @@ public class IndexShardTests extends ESSingleNodeTestCase { Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10)); logger.info("--> idxPath: [{}]", idxPath); Settings idxSettings = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, idxPath) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, idxPath) + .build(); createIndex("test", idxSettings); ensureGreen("test"); client().prepareIndex("test", "bar", "1").setSource("{}").setRefresh(true).get(); @@ -447,7 +555,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testExpectedShardSizeIsPresent() throws InterruptedException { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); + .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0)); for (int i = 0; i < 50; i++) { client().prepareIndex("test", "test").setSource("{}").get(); } @@ -475,11 +583,11 @@ public class IndexShardTests extends ESSingleNodeTestCase { IOUtils.rm(endDir); Settings sb = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString()) + .build(); Settings sb2 = Settings.builder() - .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) - .build(); + .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString()) + .build(); logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString()); createIndex(INDEX, sb); @@ -510,9 +618,9 @@ public class IndexShardTests extends ESSingleNodeTestCase { logger.info("--> updating settings..."); client().admin().indices().prepareUpdateSettings(INDEX) - .setSettings(sb2) - .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) - .get(); + .setSettings(sb2) + .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true)) + .get(); assert Files.exists(startDir) == false : "start dir shouldn't exist"; @@ -642,7 +750,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { try { shard.index(index); fail(); - }catch (IllegalIndexShardStateException e){ + } catch (IllegalIndexShardStateException e) { } @@ -655,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { try { shard.delete(delete); fail(); - }catch (IllegalIndexShardStateException e){ + } catch (IllegalIndexShardStateException e) { } @@ -692,7 +800,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { long size = shard.getEngine().getTranslog().sizeInBytes(); logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES)) - .build()).get(); + .build()).get(); client().prepareDelete("test", "test", "2").get(); logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration()); assertBusy(() -> { // this is async @@ -877,7 +985,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, false); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode, - localNode)); + localNode)); assertTrue(newShard.recoverFromStore(localNode)); assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations()); assertEquals(0, newShard.recoveryState().getTranslog().totalOperations()); @@ -890,7 +998,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(response, 0); } - public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException { + public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception { createIndex("test"); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); @@ -907,7 +1015,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { Store store = shard.store(); store.incRef(); test.removeShard(0, "b/c simon says so"); - Lucene.cleanLuceneIndex(store.directory()); + cleanLuceneIndex(store.directory()); store.decRef(); ShardRoutingHelper.reinit(routing); IndexShard newShard = test.createShard(routing); @@ -940,7 +1048,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { newShard.updateRoutingEntry(routing, true); SearchResponse response = client().prepareSearch().get(); assertHitCount(response, 0); - client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get(); + // we can't issue this request through a client because of the inconsistencies we created with the cluster state + // doing it directly instead + IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request(); + request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test"); + TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null); + newShard.refresh("test"); assertHitCount(client().prepareSearch().get(), 1); } @@ -999,7 +1112,7 @@ public class IndexShardTests extends ESSingleNodeTestCase { @Override public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) { try { - Lucene.cleanLuceneIndex(targetStore.directory()); + cleanLuceneIndex(targetStore.directory()); for (String file : sourceStore.directory().listAll()) { if (file.equals("write.lock") || file.startsWith("extra")) { continue; @@ -1205,12 +1318,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testIndexingBufferDuringInternalRecovery() throws IOException { createIndex("index"); client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); @@ -1234,12 +1347,12 @@ public class IndexShardTests extends ESSingleNodeTestCase { public void testIndexingBufferDuringPeerRecovery() throws IOException { createIndex("index"); client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject() - .startObject("testtype") - .startObject("properties") - .startObject("foo") - .field("type", "text") - .endObject() - .endObject().endObject().endObject()).get(); + .startObject("testtype") + .startObject("properties") + .startObject("foo") + .field("type", "text") + .endObject() + .endObject().endObject().endObject()).get(); ensureGreen(); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService test = indicesService.indexService(resolveIndex("index")); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java index b7984416fae..d15773db545 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java @@ -30,7 +30,7 @@ import org.elasticsearch.test.ESTestCase; public class TestShardRouting { public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) { - return newShardRouting(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE), shardId, currentNodeId,primary, state); + return newShardRouting(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE), shardId, currentNodeId, primary, state); } public static ShardRouting newShardRouting(Index index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) {