diff --git a/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java b/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java index ae858be7399..d5ed083f13f 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java +++ b/src/main/java/org/elasticsearch/cluster/routing/MutableShardRouting.java @@ -101,16 +101,12 @@ public class MutableShardRouting extends ImmutableShardRouting { } /** - * Set the shards state to UNASSIGNED. - * //TODO document the state + * Moves the shard from started to initializing and bumps the version */ - void deassignNode() { + void reinitializeShard() { + assert state == ShardRoutingState.STARTED; version++; - assert state != ShardRoutingState.UNASSIGNED; - - state = ShardRoutingState.UNASSIGNED; - this.currentNodeId = null; - this.relocatingNodeId = null; + state = ShardRoutingState.INITIALIZING; } /** diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index c97dff0c140..e2088929f3c 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -520,6 +520,16 @@ public class RoutingNodes implements Iterable { return nodesToShards.values().toArray(new RoutingNode[nodesToShards.size()]); } + public void reinitShadowPrimary(MutableShardRouting candidate) { + if (candidate.relocating()) { + cancelRelocation(candidate); + } + candidate.reinitializeShard(); + inactivePrimaryCount++; + inactiveShardCount++; + + } + public final static class UnassignedShards implements Iterable { private final List unassigned; diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index c6ce355a183..58f3ae58f50 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -21,11 +21,13 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; @@ -285,6 +287,7 @@ public class AllocationService extends AbstractComponent { shardsToFail.add(routing); } } + } } for (ShardRouting shardToFail : shardsToFail) { @@ -293,10 +296,12 @@ public class AllocationService extends AbstractComponent { // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, // routingNodes.hasUnassignedPrimaries() will potentially be false + for (MutableShardRouting shardEntry : routingNodes.unassigned()) { if (shardEntry.primary()) { MutableShardRouting candidate = allocation.routingNodes().activeReplica(shardEntry); if (candidate != null) { + IndexMetaData index = allocation.metaData().index(candidate.index()); routingNodes.swapPrimaryFlag(shardEntry, candidate); if (candidate.relocatingNodeId() != null) { changed = true; @@ -311,6 +316,10 @@ public class AllocationService extends AbstractComponent { } } } + if (IndexMetaData.isIndexUsingShadowReplicas(index.settings())) { + routingNodes.reinitShadowPrimary(candidate); + changed = true; + } } } } diff --git a/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/IndexShard.java index a9ffdffa24f..9a774653c5f 100644 --- a/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1187,13 +1187,25 @@ public class IndexShard extends AbstractIndexShardComponent { } } - protected void createNewEngine() { + private void createNewEngine() { synchronized (mutex) { if (state == IndexShardState.CLOSED) { throw new EngineClosedException(shardId); } assert this.currentEngineReference.get() == null; - this.currentEngineReference.set(engineFactory.newReadWriteEngine(config)); + this.currentEngineReference.set(newEngine()); } } + + protected Engine newEngine() { + return engineFactory.newReadWriteEngine(config); + } + + + /** + * Returns true iff this shard allows primary promotion, otherwise false + */ + public boolean allowsPrimaryPromotion() { + return true; + } } diff --git a/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java index 45f5ed3ac49..a126235ae85 100644 --- a/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.shard; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; @@ -31,6 +32,7 @@ import org.elasticsearch.index.cache.filter.ShardFilterCache; import org.elasticsearch.index.cache.query.ShardQueryCache; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; +import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.fielddata.IndexFieldDataService; @@ -62,7 +64,7 @@ import org.elasticsearch.threadpool.ThreadPool; * promoted to a primary causes the shard to fail, kicking off a re-allocation * of the primary shard. */ -public class ShadowIndexShard extends IndexShard { +public final class ShadowIndexShard extends IndexShard { private final Object mutex = new Object(); @@ -98,28 +100,19 @@ public class ShadowIndexShard extends IndexShard { */ @Override public IndexShard routingEntry(ShardRouting newRouting) { - ShardRouting shardRouting = this.routingEntry(); - super.routingEntry(newRouting); - // check for a shadow replica that now needs to be transformed into - // a normal primary today we simply fail it to force reallocation - if (shardRouting != null && shardRouting.primary() == false && // currently a replica - newRouting.primary() == true) {// becoming a primary - failShard("can't promote shadow replica to primary", - new ElasticsearchIllegalStateException("can't promote shadow replica to primary")); + if (newRouting.primary() == true) {// becoming a primary + throw new ElasticsearchIllegalStateException("can't promote shard to primary"); } - return this; + return super.routingEntry(newRouting); } @Override - protected void createNewEngine() { - synchronized (mutex) { - if (state == IndexShardState.CLOSED) { - throw new EngineClosedException(shardId); - } - assert this.currentEngineReference.get() == null; - assert this.shardRouting.primary() == false; - // Use the read-only engine for shadow replicas - this.currentEngineReference.set(engineFactory.newReadOnlyEngine(config)); - } + protected Engine newEngine() { + assert this.shardRouting.primary() == false; + return engineFactory.newReadOnlyEngine(config); + } + + public boolean allowsPrimaryPromotion() { + return false; } } diff --git a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index ddabe6a49bd..e44ce6e4328 100644 --- a/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -47,7 +47,6 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexShardAlreadyExistsException; import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.aliases.IndexAlias; @@ -70,7 +69,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.*; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.ExceptionsHelper.detailedMessage; @@ -95,7 +93,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent failedShards = ConcurrentCollections.newConcurrentMap(); - private final NodeEnvironment nodeEnvironment; static class FailedShard { public final long version; @@ -111,15 +108,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent