diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
index 267ff648163..0d32f676bc9 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java
@@ -44,7 +44,7 @@ import static com.google.common.collect.Lists.newArrayList;
* words, each instance of a shard is considered a replica while only one
* replica per shard is a primary replica. The primary replica
* can be seen as the "leader" of the shard acting as the primary entry point
- * for operations on a specific shard.
+ * for operations on a specific shard.
*
* Note: The term replica is not directly
* reflected in the routing table or in releated classes, replicas are
@@ -83,15 +83,17 @@ public class IndexRoutingTable implements Iterable {
/**
* Return the index id
+ *
* @return id of the index
*/
public String index() {
return this.index;
}
-
+
/**
* Return the index id
+ *
* @return id of the index
*/
public String getIndex() {
@@ -100,6 +102,7 @@ public class IndexRoutingTable implements Iterable {
/**
* creates a new {@link IndexRoutingTable} with all shard versions normalized
+ *
* @return new {@link IndexRoutingTable}
*/
public IndexRoutingTable normalizeVersions() {
@@ -151,9 +154,8 @@ public class IndexRoutingTable implements Iterable {
* Calculates the number of nodes that hold one or more shards of this index
* {@link IndexRoutingTable} excluding the nodes with the node ids give as
* the excludedNodes
parameter.
- *
- * @param excludedNodes
- * id of nodes that will be excluded
+ *
+ * @param excludedNodes id of nodes that will be excluded
* @return number of distinct nodes this index has at least one shard allocated on
*/
public int numberOfNodesShardsAreAllocatedOn(String... excludedNodes) {
@@ -200,7 +202,8 @@ public class IndexRoutingTable implements Iterable {
}
/**
- * Calculates the number of primary shards in active state in routing table
+ * Calculates the number of primary shards in active state in routing table
+ *
* @return number of active primary shards
*/
public int primaryShardsActive() {
@@ -237,6 +240,7 @@ public class IndexRoutingTable implements Iterable {
/**
* Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states}
+ *
* @param states a set of {@link ShardRoutingState states}
* @return a {@link List} of shards that match one of the given {@link ShardRoutingState states}
*/
@@ -308,9 +312,9 @@ public class IndexRoutingTable implements Iterable {
/**
* Reads an {@link IndexRoutingTable} from an {@link StreamInput}
+ *
* @param in {@link StreamInput} to read the {@link IndexRoutingTable} from
* @return {@link IndexRoutingTable} read
- *
* @throws IOException if something happens during read
*/
public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
@@ -327,9 +331,10 @@ public class IndexRoutingTable implements Iterable {
/**
* Writes an {@link IndexRoutingTable} to a {@link StreamOutput}.
+ *
* @param index {@link IndexRoutingTable} to write
- * @param out {@link StreamOutput} to write to
- * @throws IOException if something happens during write
+ * @param out {@link StreamOutput} to write to
+ * @throws IOException if something happens during write
*/
public static void writeTo(IndexRoutingTable index, StreamOutput out) throws IOException {
out.writeString(index.index());
@@ -421,6 +426,16 @@ public class IndexRoutingTable implements Iterable {
return this;
}
+ /**
+ * Clears the post allocation flag for the specified shard
+ */
+ public Builder clearPostAllocationFlag(ShardId shardId) {
+ assert this.index.equals(shardId.index().name());
+ IndexShardRoutingTable indexShard = shards.get(shardId.id());
+ shards.put(indexShard.shardId().id(), new IndexShardRoutingTable(indexShard.shardId(), indexShard.shards(), false));
+ return this;
+ }
+
/**
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
* if it needs to be created.
diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
index 799a87da54c..60f858b8f5d 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java
@@ -19,12 +19,15 @@
package org.elasticsearch.cluster.routing;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import gnu.trove.map.hash.TObjectIntHashMap;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.index.shard.ShardId;
import java.util.*;
@@ -49,6 +52,8 @@ public class RoutingNodes implements Iterable {
private final List ignoredUnassigned = newArrayList();
+ private Set clearPostAllocationFlag;
+
private final Map> nodesPerAttributeNames = new HashMap>();
public RoutingNodes(ClusterState clusterState) {
@@ -159,6 +164,25 @@ public class RoutingNodes implements Iterable {
return nodesToShards();
}
+ /**
+ * Clears the post allocation flag for the provided shard id. NOTE: this should be used cautiously
+ * since it will lead to data loss of the primary shard is not allocated, as it will allocate
+ * the primary shard on a node and *not* expect it to have an existing valid index there.
+ */
+ public void addClearPostAllocationFlag(ShardId shardId) {
+ if (clearPostAllocationFlag == null) {
+ clearPostAllocationFlag = Sets.newHashSet();
+ }
+ clearPostAllocationFlag.add(shardId);
+ }
+
+ public Iterable getShardsToClearPostAllocationFlag() {
+ if (clearPostAllocationFlag == null) {
+ return ImmutableSet.of();
+ }
+ return clearPostAllocationFlag;
+ }
+
public RoutingNode node(String nodeId) {
return nodesToShards.get(nodeId);
}
diff --git a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
index 7e8504cd76b..8e35db1bae7 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
@@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexMissingException;
import java.io.IOException;
@@ -38,7 +39,8 @@ import static com.google.common.collect.Maps.newHashMap;
/**
* Represents a global cluster-wide routing table for all indices including the
- * version of the current routing state.
+ * version of the current routing state.
+ *
* @see IndexRoutingTable
*/
public class RoutingTable implements Iterable {
@@ -57,6 +59,7 @@ public class RoutingTable implements Iterable {
/**
* Returns the version of the {@link RoutingTable}.
+ *
* @return version of the {@link RoutingTable}
*/
public long version() {
@@ -305,6 +308,14 @@ public class RoutingTable implements Iterable {
IndexShardRoutingTable refData = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id());
indexBuilder.addShard(refData, shardRoutingEntry);
}
+
+ for (ShardId shardId : routingNodes.getShardsToClearPostAllocationFlag()) {
+ IndexRoutingTable.Builder indexRoutingBuilder = indexRoutingTableBuilders.get(shardId.index().name());
+ if (indexRoutingBuilder != null) {
+ indexRoutingBuilder.clearPostAllocationFlag(shardId);
+ }
+ }
+
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
add(indexBuilder);
}
diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
index 38c1f8f5062..ce615cec71c 100644
--- a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
+++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
@@ -117,9 +117,9 @@ public class AllocateAllocationCommand implements AllocationCommand {
/**
* Create a new {@link AllocateAllocationCommand}
- *
- * @param shardId {@link ShardId} of the shrad to assign
- * @param node Node to assign the shard to
+ *
+ * @param shardId {@link ShardId} of the shrad to assign
+ * @param node Node to assign the shard to
* @param allowPrimary should the node be allow to allocate the shard as primary
*/
public AllocateAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
@@ -134,7 +134,8 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
/**
- * Get the shards id
+ * Get the shards id
+ *
* @return id of the shard
*/
public ShardId shardId() {
@@ -143,6 +144,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
/**
* Get the id of the Node
+ *
* @return id of the Node
*/
public String node() {
@@ -150,8 +152,9 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
/**
- * Determine if primary allocation is allowed
- * @return true
if primary allocation is allowed. Otherwise false
+ * Determine if primary allocation is allowed
+ *
+ * @return true
if primary allocation is allowed. Otherwise false
*/
public boolean allowPrimary() {
return this.allowPrimary;
@@ -191,6 +194,11 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
it.remove();
routingNode.add(shardRouting);
+ if (shardRouting.primary()) {
+ // we need to clear the post allocation flag, since its an explicit allocation of the primary shard
+ // and we want to force allocate it (and create a new index for it)
+ allocation.routingNodes().addClearPostAllocationFlag(shardRouting.shardId());
+ }
break;
}
}
diff --git a/src/test/java/org/elasticsearch/test/integration/cluster/allocation/ClusterRerouteTests.java b/src/test/java/org/elasticsearch/test/integration/cluster/allocation/ClusterRerouteTests.java
index af70a86212b..a002054c116 100644
--- a/src/test/java/org/elasticsearch/test/integration/cluster/allocation/ClusterRerouteTests.java
+++ b/src/test/java/org/elasticsearch/test/integration/cluster/allocation/ClusterRerouteTests.java
@@ -20,14 +20,17 @@
package org.elasticsearch.test.integration.cluster.allocation;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.internal.InternalNode;
@@ -35,6 +38,8 @@ import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
+import java.io.File;
+
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -161,5 +166,36 @@ public class ClusterRerouteTests extends AbstractNodesTests {
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
+
+ client("node1").prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).execute().actionGet();
+
+ logger.info("--> closing all nodes");
+ File shardLocation = ((InternalNode) node("node1")).injector().getInstance(NodeEnvironment.class).shardLocations(new ShardId("test", 0))[0];
+ closeAllNodes();
+
+ logger.info("--> deleting the shard data");
+ FileSystemUtils.deleteRecursively(shardLocation);
+
+ logger.info("--> starting the first node back, will not allocate the shard since it has no data, but the index will be there");
+ startNode("node1", commonSettings);
+ // wait a bit for the cluster to realize that the shard is not there...
+ // TODO can we get around this? the cluster is RED, so what do we wait for?
+ Thread.sleep(300);
+ assertThat(client("node1").admin().cluster().prepareHealth().execute().actionGet().getStatus(), equalTo(ClusterHealthStatus.RED));
+ logger.info("--> explicitly allocate primary");
+ state = client("node1").admin().cluster().prepareReroute()
+ .add(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true))
+ .execute().actionGet().getState();
+ assertThat(state.routingNodes().unassigned().size(), equalTo(1));
+ assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
+
+ healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
+ assertThat(healthResponse.isTimedOut(), equalTo(false));
+
+ logger.info("--> get the state, verify shard 1 primary allocated");
+ state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
+ assertThat(state.routingNodes().unassigned().size(), equalTo(1));
+ assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
+
}
}
\ No newline at end of file