From 110c4d06250bf5f37da424542a5128b609bd319a Mon Sep 17 00:00:00 2001 From: kimchy Date: Sun, 12 Jun 2011 21:16:58 +0300 Subject: [PATCH] add version on a shard replication group level, optimizing both the initial allocation of large number of shards and persistency in case of local gateway --- .../routing/ImmutableShardRouting.java | 23 ++- .../cluster/routing/IndexRoutingTable.java | 23 ++- .../routing/IndexShardRoutingTable.java | 34 +++++ .../cluster/routing/MutableShardRouting.java | 20 ++- .../cluster/routing/RoutingNodes.java | 9 +- .../cluster/routing/RoutingTable.java | 4 + .../cluster/routing/ShardRouting.java | 5 + .../routing/allocation/ShardsAllocation.java | 16 +- .../gateway/local/LocalGateway.java | 144 +++++++++--------- .../local/LocalGatewayNodeAllocation.java | 6 +- .../local/LocalGatewayStartedShards.java | 19 ++- .../allocation/FailedShardsRoutingTests.java | 8 +- .../allocation/ShardVersioningTests.java | 104 +++++++++++++ 13 files changed, 315 insertions(+), 100 deletions(-) create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java index 6a627937667..aab39c43f63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ImmutableShardRouting.java @@ -45,6 +45,8 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou protected ShardRoutingState state; + protected long version; + private transient ShardId shardIdentifier; private final transient ImmutableList asList; @@ -54,23 +56,30 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou } public ImmutableShardRouting(ShardRouting copy) { - this(copy.index(), copy.id(), copy.currentNodeId(), copy.primary(), copy.state()); + this(copy.index(), copy.id(), copy.currentNodeId(), copy.primary(), copy.state(), copy.version()); this.relocatingNodeId = copy.relocatingNodeId(); } + public ImmutableShardRouting(ShardRouting copy, long version) { + this(copy.index(), copy.id(), copy.currentNodeId(), copy.primary(), copy.state(), copy.version()); + this.relocatingNodeId = copy.relocatingNodeId(); + this.version = version; + } + public ImmutableShardRouting(String index, int shardId, String currentNodeId, - String relocatingNodeId, boolean primary, ShardRoutingState state) { - this(index, shardId, currentNodeId, primary, state); + String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { + this(index, shardId, currentNodeId, primary, state, version); this.relocatingNodeId = relocatingNodeId; } - public ImmutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) { + public ImmutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { this.index = index; this.shardId = shardId; this.currentNodeId = currentNodeId; this.primary = primary; this.state = state; this.asList = ImmutableList.of((ShardRouting) this); + this.version = version; } @Override public String index() { @@ -89,6 +98,10 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou return id(); } + @Override public long version() { + return this.version; + } + @Override public boolean unassigned() { return state == ShardRoutingState.UNASSIGNED; } @@ -160,6 +173,7 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou } @Override public void readFromThin(StreamInput in) throws IOException { + version = in.readLong(); if (in.readBoolean()) { currentNodeId = in.readUTF(); } @@ -180,6 +194,7 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou * Does not write index name and shard id */ public void writeToThin(StreamOutput out) throws IOException { + out.writeLong(version); if (currentNodeId != null) { out.writeBoolean(true); out.writeUTF(currentNodeId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 743629cdbf1..ddf2f56470d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -30,7 +30,11 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.concurrent.Immutable; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.common.collect.Lists.*; @@ -71,6 +75,14 @@ public class IndexRoutingTable implements Iterable { return index(); } + public IndexRoutingTable normalizeVersions() { + IndexRoutingTable.Builder builder = new Builder(this.index); + for (IndexShardRoutingTable shardTable : shards.values()) { + builder.addIndexShard(shardTable.normalizeVersions()); + } + return builder.build(); + } + public void validate(RoutingTableValidation validation, MetaData metaData) { if (!metaData.hasIndex(index())) { validation.addIndexFailure(index(), "Exists in routing does not exists in metadata"); @@ -258,7 +270,7 @@ public class IndexRoutingTable implements Iterable { public Builder initializeEmpty(IndexMetaData indexMetaData, boolean fromApi) { for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) { for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) { - addShard(shardId, null, i == 0, ShardRoutingState.UNASSIGNED, fromApi); + addShard(shardId, null, i == 0, ShardRoutingState.UNASSIGNED, 0, fromApi); } } return this; @@ -266,7 +278,8 @@ public class IndexRoutingTable implements Iterable { public Builder addReplica() { for (int shardId : shards.keySet()) { - addShard(shardId, null, false, ShardRoutingState.UNASSIGNED, false); + // version 0, will get updated when reroute will happen + addShard(shardId, null, false, ShardRoutingState.UNASSIGNED, 0, false); } return this; } @@ -315,8 +328,8 @@ public class IndexRoutingTable implements Iterable { return internalAddShard(new ImmutableShardRouting(shard), fromApi); } - public Builder addShard(int shardId, String nodeId, boolean primary, ShardRoutingState state, boolean fromApi) { - ImmutableShardRouting shard = new ImmutableShardRouting(index, shardId, nodeId, primary, state); + private Builder addShard(int shardId, String nodeId, boolean primary, ShardRoutingState state, long version, boolean fromApi) { + ImmutableShardRouting shard = new ImmutableShardRouting(index, shardId, nodeId, primary, state, version); return internalAddShard(shard, fromApi); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 76e54a725ea..b1bc22d2287 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -54,6 +54,40 @@ public class IndexShardRoutingTable implements Iterable { this.counter = new AtomicInteger(ThreadLocalRandom.current().nextInt(shards.size())); } + /** + * Normalizes all shard routings to the same version. + */ + public IndexShardRoutingTable normalizeVersions() { + if (shards.isEmpty()) { + return this; + } + if (shards.size() == 1) { + return this; + } + long highestVersion = shards.get(0).version(); + boolean requiresNormalization = false; + for (int i = 1; i < shards.size(); i++) { + if (shards.get(i).version() != highestVersion) { + requiresNormalization = true; + } + if (shards.get(i).version() > highestVersion) { + highestVersion = shards.get(i).version(); + } + } + if (!requiresNormalization) { + return this; + } + List shardRoutings = new ArrayList(shards.size()); + for (int i = 0; i < shards.size(); i++) { + if (shards.get(i).version() == highestVersion) { + shardRoutings.add(shards.get(i)); + } else { + shardRoutings.add(new ImmutableShardRouting(shards.get(i), highestVersion)); + } + } + return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shardRoutings), allocatedPostApi); + } + /** * Has this shard group primary shard been allocated post API creation. Will be set to * true if it was created because of recovery action. diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java index a3bff2d2f8c..da1ba08fbaa 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java @@ -28,16 +28,22 @@ public class MutableShardRouting extends ImmutableShardRouting { super(copy); } - public MutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) { - super(index, shardId, currentNodeId, primary, state); + public MutableShardRouting(ShardRouting copy, long version) { + super(copy); + this.version = version; + } + + public MutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) { + super(index, shardId, currentNodeId, primary, state, version); } public MutableShardRouting(String index, int shardId, String currentNodeId, - String relocatingNodeId, boolean primary, ShardRoutingState state) { - super(index, shardId, currentNodeId, relocatingNodeId, primary, state); + String relocatingNodeId, boolean primary, ShardRoutingState state, long version) { + super(index, shardId, currentNodeId, relocatingNodeId, primary, state, version); } public void assignToNode(String nodeId) { + version++; if (currentNodeId == null) { assert state == ShardRoutingState.UNASSIGNED; @@ -53,12 +59,14 @@ public class MutableShardRouting extends ImmutableShardRouting { } public void relocate(String relocatingNodeId) { + version++; assert state == ShardRoutingState.STARTED; state = ShardRoutingState.RELOCATING; this.relocatingNodeId = relocatingNodeId; } public void cancelRelocation() { + version++; assert state == ShardRoutingState.RELOCATING; assert assignedToNode(); assert relocatingNodeId != null; @@ -68,6 +76,7 @@ public class MutableShardRouting extends ImmutableShardRouting { } public void deassignNode() { + version++; assert state != ShardRoutingState.UNASSIGNED; state = ShardRoutingState.UNASSIGNED; @@ -76,12 +85,14 @@ public class MutableShardRouting extends ImmutableShardRouting { } public void moveToStarted() { + version++; assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING; relocatingNodeId = null; state = ShardRoutingState.STARTED; } public void moveToPrimary() { + version++; if (primary) { throw new IllegalShardRoutingStateException(this, "Already primary, can't move to primary"); } @@ -89,6 +100,7 @@ public class MutableShardRouting extends ImmutableShardRouting { } public void moveFromPrimary() { + version++; if (!primary) { throw new IllegalShardRoutingStateException(this, "Already primary, can't move to replica"); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index 5c84673400c..99749cfa9e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -24,7 +24,12 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.util.concurrent.NotThreadSafe; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import static org.elasticsearch.common.collect.Lists.*; import static org.elasticsearch.common.collect.Maps.*; @@ -71,7 +76,7 @@ public class RoutingNodes implements Iterable { // add the counterpart shard with relocatingNodeId reflecting the source from which // it's relocating from. entries.add(new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(), - shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING)); + shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version())); } } else { unassigned.add(new MutableShardRouting(shard)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 2f9a723e1dd..873522a98b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -317,6 +317,10 @@ public class RoutingTable implements Iterable { } public RoutingTable build() { + // normalize the versions right before we build it... + for (IndexRoutingTable indexRoutingTable : indicesRouting.values()) { + indicesRouting.put(indexRoutingTable.index(), indexRoutingTable.normalizeVersions()); + } return new RoutingTable(version, indicesRouting); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java index 3efb3306e57..2ba47e46d43 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java @@ -59,6 +59,11 @@ public interface ShardRouting extends Streamable, Serializable { */ int getId(); + /** + * The routing version associated with the shard. + */ + long version(); + /** * The shard state. */ diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java index ab88aa2ebd1..966819a3fcd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAllocation.java @@ -21,7 +21,13 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; @@ -194,7 +200,7 @@ public class ShardsAllocation extends AbstractComponent { changed = true; lowRoutingNode.add(new MutableShardRouting(startedShard.index(), startedShard.id(), lowRoutingNode.nodeId(), startedShard.currentNodeId(), - startedShard.primary(), INITIALIZING)); + startedShard.primary(), INITIALIZING, startedShard.version() + 1)); startedShard.relocate(lowRoutingNode.nodeId()); relocated = true; @@ -276,7 +282,7 @@ public class ShardsAllocation extends AbstractComponent { } // allocate all the unassigned shards above the average per node. - for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext();) { + for (Iterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) { MutableShardRouting shard = it.next(); // go over the nodes and try and allocate the remaining ones for (RoutingNode routingNode : routingNodes.sortedNodesLeastToHigh()) { @@ -314,7 +320,7 @@ public class ShardsAllocation extends AbstractComponent { } Set nodeIdsToRemove = newHashSet(); for (RoutingNode routingNode : routingNodes) { - for (Iterator shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext();) { + for (Iterator shardsIterator = routingNode.shards().iterator(); shardsIterator.hasNext(); ) { MutableShardRouting shardRoutingEntry = shardsIterator.next(); if (shardRoutingEntry.assignedToNode()) { // we store the relocation state here since when we call de-assign node @@ -476,7 +482,7 @@ public class ShardsAllocation extends AbstractComponent { // add the failed shard to the unassigned shards allocation.routingNodes().unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), - null, failedShard.primary(), ShardRoutingState.UNASSIGNED)); + null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1)); return true; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java index 24664e01fe5..4f73a720434 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGateway.java @@ -242,80 +242,86 @@ public class LocalGateway extends AbstractLifecycleComponent implements } if (event.state().nodes().localNode().dataNode() && event.routingTableChanged()) { - executor.execute(new Runnable() { - @Override public void run() { - LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); - if (currentStartedShards != null) { - builder.state(currentStartedShards); - } - builder.version(event.state().version()); - // remove from the current state all the shards that are primary and started somewhere, we won't need them anymore - // and if they are still here, we will add them in the next phase + LocalGatewayStartedShards.Builder builder = LocalGatewayStartedShards.builder(); + if (currentStartedShards != null) { + builder.state(currentStartedShards); + } + builder.version(event.state().version()); - // Also note, this works well when closing an index, since a closed index will have no routing shards entries - // so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed) - for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) { - builder.remove(indexShardRoutingTable.shardId()); - } - } - } - // remove deleted indices from the started shards - for (ShardId shardId : builder.build().shards().keySet()) { - if (!event.state().metaData().hasIndex(shardId.index().name())) { - builder.remove(shardId); - } - } - // now, add all the ones that are active and on this node - RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); - if (routingNode != null) { - // out node is not in play yet... - for (MutableShardRouting shardRouting : routingNode) { - if (shardRouting.active()) { - builder.put(shardRouting.shardId(), event.state().version()); - } - } - } + boolean changed = false; - try { - File stateFile = new File(location, "shards-" + event.state().version()); - OutputStream fos = new FileOutputStream(stateFile); - if (compress) { - fos = new LZFOutputStream(fos); - } + // remove from the current state all the shards that are primary and started somewhere, we won't need them anymore + // and if they are still here, we will add them in the next phase - LocalGatewayStartedShards stateToWrite = builder.build(); - XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); - if (prettyPrint) { - xContentBuilder.prettyPrint(); - } - xContentBuilder.startObject(); - LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); - xContentBuilder.endObject(); - xContentBuilder.close(); - - fos.close(); - - FileSystemUtils.syncFile(stateFile); - - currentStartedShards = stateToWrite; - } catch (IOException e) { - logger.warn("failed to write updated state", e); - return; - } - - // delete all the other files - File[] files = location.listFiles(new FilenameFilter() { - @Override public boolean accept(File dir, String name) { - return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); - } - }); - for (File file : files) { - file.delete(); + // Also note, this works well when closing an index, since a closed index will have no routing shards entries + // so they won't get removed (we want to keep the fact that those shards are allocated on this node if needed) + for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { + for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { + if (indexShardRoutingTable.countWithState(ShardRoutingState.STARTED) == indexShardRoutingTable.size()) { + changed |= builder.remove(indexShardRoutingTable.shardId()); } } - }); + } + // remove deleted indices from the started shards + for (ShardId shardId : builder.build().shards().keySet()) { + if (!event.state().metaData().hasIndex(shardId.index().name())) { + changed |= builder.remove(shardId); + } + } + // now, add all the ones that are active and on this node + RoutingNode routingNode = event.state().readOnlyRoutingNodes().node(event.state().nodes().localNodeId()); + if (routingNode != null) { + // out node is not in play yet... + for (MutableShardRouting shardRouting : routingNode) { + if (shardRouting.active()) { + changed |= builder.put(shardRouting.shardId(), shardRouting.version()); + } + } + } + + // only write if something changed... + if (changed) { + final LocalGatewayStartedShards stateToWrite = builder.build(); + executor.execute(new Runnable() { + @Override public void run() { + try { + File stateFile = new File(location, "shards-" + event.state().version()); + OutputStream fos = new FileOutputStream(stateFile); + if (compress) { + fos = new LZFOutputStream(fos); + } + + XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.JSON, fos); + if (prettyPrint) { + xContentBuilder.prettyPrint(); + } + xContentBuilder.startObject(); + LocalGatewayStartedShards.Builder.toXContent(stateToWrite, xContentBuilder, ToXContent.EMPTY_PARAMS); + xContentBuilder.endObject(); + xContentBuilder.close(); + + fos.close(); + + FileSystemUtils.syncFile(stateFile); + + currentStartedShards = stateToWrite; + } catch (IOException e) { + logger.warn("failed to write updated state", e); + return; + } + + // delete all the other files + File[] files = location.listFiles(new FilenameFilter() { + @Override public boolean accept(File dir, String name) { + return name.startsWith("shards-") && !name.equals("shards-" + event.state().version()); + } + }); + for (File file : files) { + file.delete(); + } + } + }); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java index 976b15b6e83..a47b90f893f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayNodeAllocation.java @@ -183,7 +183,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } // we found a match changed = true; - node.add(shard); + // make sure we create one with the version from the recovered state + node.add(new MutableShardRouting(shard, highestVersion)); unassignedIterator.remove(); // found a node, so no throttling, no "no", and break out of the loop @@ -202,7 +203,8 @@ public class LocalGatewayNodeAllocation extends NodeAllocation { } // we found a match changed = true; - node.add(shard); + // make sure we create one with the version from the recovered state + node.add(new MutableShardRouting(shard, highestVersion)); unassignedIterator.remove(); } } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java index 4ee4106732e..edafcba6d15 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/gateway/local/LocalGatewayStartedShards.java @@ -92,14 +92,23 @@ public class LocalGatewayStartedShards { return this; } - public Builder remove(ShardId shardId) { - this.shards.remove(shardId); - return this; + /** + * Returns true if something really changed. + */ + public boolean remove(ShardId shardId) { + return shards.remove(shardId) != null; } - public Builder put(ShardId shardId, long version) { + /** + * Returns true if something really changed. + */ + public boolean put(ShardId shardId, long version) { + Long lVersion = shards.get(shardId); + if (lVersion != null && lVersion == version) { + return false; + } this.shards.put(shardId, version); - return this; + return true; } public LocalGatewayStartedShards build() { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java index e7cbd154982..b04045a9d7d 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/FailedShardsRoutingTests.java @@ -165,7 +165,7 @@ public class FailedShardsRoutingTests { logger.info("fail the first shard, will have no place to be rerouted to (single node), so stays unassigned"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable(); + routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); RoutingNodes routingNodes = clusterState.routingNodes(); @@ -181,7 +181,7 @@ public class FailedShardsRoutingTests { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); } @Test public void firstAllocationFailureTwoNodes() { @@ -221,7 +221,7 @@ public class FailedShardsRoutingTests { logger.info("fail the first shard, will start INITIALIZING on the second node"); prevRoutingTable = routingTable; - routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).routingTable(); + routingTable = strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).routingTable(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); RoutingNodes routingNodes = clusterState.routingNodes(); @@ -237,7 +237,7 @@ public class FailedShardsRoutingTests { } logger.info("fail the shard again, see that nothing happens"); - assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING)).changed(), equalTo(false)); + assertThat(strategy.applyFailedShard(clusterState, new ImmutableShardRouting("test", 0, "node1", true, INITIALIZING, 0)).changed(), equalTo(false)); } @Test public void rebalanceFailure() { diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java new file mode 100644 index 00000000000..7b288290491 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardVersioningTests.java @@ -0,0 +1,104 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster.routing.allocation; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.testng.annotations.Test; + +import static org.elasticsearch.cluster.ClusterState.*; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; +import static org.elasticsearch.cluster.metadata.MetaData.*; +import static org.elasticsearch.cluster.node.DiscoveryNodes.*; +import static org.elasticsearch.cluster.routing.RoutingBuilders.*; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.*; +import static org.elasticsearch.common.settings.ImmutableSettings.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +@Test +public class ShardVersioningTests { + + private final ESLogger logger = Loggers.getLogger(ShardVersioningTests.class); + + @Test public void simple() { + ShardsAllocation strategy = new ShardsAllocation(settingsBuilder().put("cluster.routing.allocation.allow_rebalance", ClusterRebalanceNodeAllocation.ClusterRebalanceType.ALWAYS.toString()).build()); + + MetaData metaData = newMetaDataBuilder() + .put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1)) + .put(newIndexMetaDataBuilder("test2").numberOfShards(1).numberOfReplicas(1)) + .build(); + + RoutingTable routingTable = routingTable() + .add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1"))) + .add(indexRoutingTable("test2").initializeEmpty(metaData.index("test2"))) + .build(); + + ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build(); + + logger.info("start two nodes"); + clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build(); + RoutingTable prevRoutingTable = routingTable; + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test1").shard(i).primaryShard().version(), equalTo(1l)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + for (int i = 0; i < routingTable.index("test2").shards().size(); i++) { + assertThat(routingTable.index("test2").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test2").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test2").shard(i).primaryShard().version(), equalTo(1l)); + assertThat(routingTable.index("test2").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + } + + logger.info("start all the primary shards for test1, replicas will start initializing"); + RoutingNodes routingNodes = clusterState.routingNodes(); + prevRoutingTable = routingTable; + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test1", INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (int i = 0; i < routingTable.index("test1").shards().size(); i++) { + assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED)); + assertThat(routingTable.index("test1").shard(i).primaryShard().version(), equalTo(2l)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test1").shard(i).replicaShards().get(0).version(), equalTo(2l)); + } + + for (int i = 0; i < routingTable.index("test2").shards().size(); i++) { + assertThat(routingTable.index("test2").shard(i).shards().size(), equalTo(2)); + assertThat(routingTable.index("test2").shard(i).primaryShard().state(), equalTo(INITIALIZING)); + assertThat(routingTable.index("test2").shard(i).primaryShard().version(), equalTo(1l)); + assertThat(routingTable.index("test2").shard(i).replicaShards().get(0).state(), equalTo(UNASSIGNED)); + assertThat(routingTable.index("test2").shard(i).replicaShards().get(0).version(), equalTo(1l)); + } + } +} \ No newline at end of file