Reroute Allocate to force primary allocation when enabled
Typically, the main reason a reroute allocation command with allow_primary is enabled, is to force create an empty new shard because a shard (and its replicas) were lost. This can't be done today because the shard expects to have a valid index where its allocated, we need to clear its post allocation flag to make sure it is allowed to create a fresh index.
This commit is contained in:
parent
15c8510e65
commit
8a2e5bbe68
|
@ -83,6 +83,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the index id
|
* Return the index id
|
||||||
|
*
|
||||||
* @return id of the index
|
* @return id of the index
|
||||||
*/
|
*/
|
||||||
public String index() {
|
public String index() {
|
||||||
|
@ -92,6 +93,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the index id
|
* Return the index id
|
||||||
|
*
|
||||||
* @return id of the index
|
* @return id of the index
|
||||||
*/
|
*/
|
||||||
public String getIndex() {
|
public String getIndex() {
|
||||||
|
@ -100,6 +102,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* creates a new {@link IndexRoutingTable} with all shard versions normalized
|
* creates a new {@link IndexRoutingTable} with all shard versions normalized
|
||||||
|
*
|
||||||
* @return new {@link IndexRoutingTable}
|
* @return new {@link IndexRoutingTable}
|
||||||
*/
|
*/
|
||||||
public IndexRoutingTable normalizeVersions() {
|
public IndexRoutingTable normalizeVersions() {
|
||||||
|
@ -152,8 +155,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
* {@link IndexRoutingTable} excluding the nodes with the node ids give as
|
* {@link IndexRoutingTable} excluding the nodes with the node ids give as
|
||||||
* the <code>excludedNodes</code> parameter.
|
* the <code>excludedNodes</code> parameter.
|
||||||
*
|
*
|
||||||
* @param excludedNodes
|
* @param excludedNodes id of nodes that will be excluded
|
||||||
* id of nodes that will be excluded
|
|
||||||
* @return number of distinct nodes this index has at least one shard allocated on
|
* @return number of distinct nodes this index has at least one shard allocated on
|
||||||
*/
|
*/
|
||||||
public int numberOfNodesShardsAreAllocatedOn(String... excludedNodes) {
|
public int numberOfNodesShardsAreAllocatedOn(String... excludedNodes) {
|
||||||
|
@ -201,6 +203,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculates the number of primary shards in active state in routing table
|
* Calculates the number of primary shards in active state in routing table
|
||||||
|
*
|
||||||
* @return number of active primary shards
|
* @return number of active primary shards
|
||||||
*/
|
*/
|
||||||
public int primaryShardsActive() {
|
public int primaryShardsActive() {
|
||||||
|
@ -237,6 +240,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states}
|
* Returns a {@link List} of shards that match one of the states listed in {@link ShardRoutingState states}
|
||||||
|
*
|
||||||
* @param states a set of {@link ShardRoutingState states}
|
* @param states a set of {@link ShardRoutingState states}
|
||||||
* @return a {@link List} of shards that match one of the given {@link ShardRoutingState states}
|
* @return a {@link List} of shards that match one of the given {@link ShardRoutingState states}
|
||||||
*/
|
*/
|
||||||
|
@ -308,9 +312,9 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads an {@link IndexRoutingTable} from an {@link StreamInput}
|
* Reads an {@link IndexRoutingTable} from an {@link StreamInput}
|
||||||
|
*
|
||||||
* @param in {@link StreamInput} to read the {@link IndexRoutingTable} from
|
* @param in {@link StreamInput} to read the {@link IndexRoutingTable} from
|
||||||
* @return {@link IndexRoutingTable} read
|
* @return {@link IndexRoutingTable} read
|
||||||
*
|
|
||||||
* @throws IOException if something happens during read
|
* @throws IOException if something happens during read
|
||||||
*/
|
*/
|
||||||
public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
|
public static IndexRoutingTable readFrom(StreamInput in) throws IOException {
|
||||||
|
@ -327,8 +331,9 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Writes an {@link IndexRoutingTable} to a {@link StreamOutput}.
|
* Writes an {@link IndexRoutingTable} to a {@link StreamOutput}.
|
||||||
|
*
|
||||||
* @param index {@link IndexRoutingTable} to write
|
* @param index {@link IndexRoutingTable} to write
|
||||||
* @param out {@link StreamOutput} to write to
|
* @param out {@link StreamOutput} to write to
|
||||||
* @throws IOException if something happens during write
|
* @throws IOException if something happens during write
|
||||||
*/
|
*/
|
||||||
public static void writeTo(IndexRoutingTable index, StreamOutput out) throws IOException {
|
public static void writeTo(IndexRoutingTable index, StreamOutput out) throws IOException {
|
||||||
|
@ -421,6 +426,16 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears the post allocation flag for the specified shard
|
||||||
|
*/
|
||||||
|
public Builder clearPostAllocationFlag(ShardId shardId) {
|
||||||
|
assert this.index.equals(shardId.index().name());
|
||||||
|
IndexShardRoutingTable indexShard = shards.get(shardId.id());
|
||||||
|
shards.put(indexShard.shardId().id(), new IndexShardRoutingTable(indexShard.shardId(), indexShard.shards(), false));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
|
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
|
||||||
* if it needs to be created.
|
* if it needs to be created.
|
||||||
|
|
|
@ -19,12 +19,15 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.routing;
|
package org.elasticsearch.cluster.routing;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import gnu.trove.map.hash.TObjectIntHashMap;
|
import gnu.trove.map.hash.TObjectIntHashMap;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
|
@ -49,6 +52,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
|
|
||||||
private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
|
private final List<MutableShardRouting> ignoredUnassigned = newArrayList();
|
||||||
|
|
||||||
|
private Set<ShardId> clearPostAllocationFlag;
|
||||||
|
|
||||||
private final Map<String, TObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<String, TObjectIntHashMap<String>>();
|
private final Map<String, TObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<String, TObjectIntHashMap<String>>();
|
||||||
|
|
||||||
public RoutingNodes(ClusterState clusterState) {
|
public RoutingNodes(ClusterState clusterState) {
|
||||||
|
@ -159,6 +164,25 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
||||||
return nodesToShards();
|
return nodesToShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clears the post allocation flag for the provided shard id. NOTE: this should be used cautiously
|
||||||
|
* since it will lead to data loss of the primary shard is not allocated, as it will allocate
|
||||||
|
* the primary shard on a node and *not* expect it to have an existing valid index there.
|
||||||
|
*/
|
||||||
|
public void addClearPostAllocationFlag(ShardId shardId) {
|
||||||
|
if (clearPostAllocationFlag == null) {
|
||||||
|
clearPostAllocationFlag = Sets.newHashSet();
|
||||||
|
}
|
||||||
|
clearPostAllocationFlag.add(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Iterable<ShardId> getShardsToClearPostAllocationFlag() {
|
||||||
|
if (clearPostAllocationFlag == null) {
|
||||||
|
return ImmutableSet.of();
|
||||||
|
}
|
||||||
|
return clearPostAllocationFlag;
|
||||||
|
}
|
||||||
|
|
||||||
public RoutingNode node(String nodeId) {
|
public RoutingNode node(String nodeId) {
|
||||||
return nodesToShards.get(nodeId);
|
return nodesToShards.get(nodeId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -39,6 +40,7 @@ import static com.google.common.collect.Maps.newHashMap;
|
||||||
/**
|
/**
|
||||||
* Represents a global cluster-wide routing table for all indices including the
|
* Represents a global cluster-wide routing table for all indices including the
|
||||||
* version of the current routing state.
|
* version of the current routing state.
|
||||||
|
*
|
||||||
* @see IndexRoutingTable
|
* @see IndexRoutingTable
|
||||||
*/
|
*/
|
||||||
public class RoutingTable implements Iterable<IndexRoutingTable> {
|
public class RoutingTable implements Iterable<IndexRoutingTable> {
|
||||||
|
@ -57,6 +59,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the version of the {@link RoutingTable}.
|
* Returns the version of the {@link RoutingTable}.
|
||||||
|
*
|
||||||
* @return version of the {@link RoutingTable}
|
* @return version of the {@link RoutingTable}
|
||||||
*/
|
*/
|
||||||
public long version() {
|
public long version() {
|
||||||
|
@ -305,6 +308,14 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
||||||
IndexShardRoutingTable refData = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id());
|
IndexShardRoutingTable refData = routingNodes.routingTable().index(shardRoutingEntry.index()).shard(shardRoutingEntry.id());
|
||||||
indexBuilder.addShard(refData, shardRoutingEntry);
|
indexBuilder.addShard(refData, shardRoutingEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (ShardId shardId : routingNodes.getShardsToClearPostAllocationFlag()) {
|
||||||
|
IndexRoutingTable.Builder indexRoutingBuilder = indexRoutingTableBuilders.get(shardId.index().name());
|
||||||
|
if (indexRoutingBuilder != null) {
|
||||||
|
indexRoutingBuilder.clearPostAllocationFlag(shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
|
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
|
||||||
add(indexBuilder);
|
add(indexBuilder);
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,8 +118,8 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
/**
|
/**
|
||||||
* Create a new {@link AllocateAllocationCommand}
|
* Create a new {@link AllocateAllocationCommand}
|
||||||
*
|
*
|
||||||
* @param shardId {@link ShardId} of the shrad to assign
|
* @param shardId {@link ShardId} of the shrad to assign
|
||||||
* @param node Node to assign the shard to
|
* @param node Node to assign the shard to
|
||||||
* @param allowPrimary should the node be allow to allocate the shard as primary
|
* @param allowPrimary should the node be allow to allocate the shard as primary
|
||||||
*/
|
*/
|
||||||
public AllocateAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
|
public AllocateAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
|
||||||
|
@ -135,6 +135,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the shards id
|
* Get the shards id
|
||||||
|
*
|
||||||
* @return id of the shard
|
* @return id of the shard
|
||||||
*/
|
*/
|
||||||
public ShardId shardId() {
|
public ShardId shardId() {
|
||||||
|
@ -143,6 +144,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the id of the Node
|
* Get the id of the Node
|
||||||
|
*
|
||||||
* @return id of the Node
|
* @return id of the Node
|
||||||
*/
|
*/
|
||||||
public String node() {
|
public String node() {
|
||||||
|
@ -151,6 +153,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine if primary allocation is allowed
|
* Determine if primary allocation is allowed
|
||||||
|
*
|
||||||
* @return <code>true</code> if primary allocation is allowed. Otherwise <code>false</code>
|
* @return <code>true</code> if primary allocation is allowed. Otherwise <code>false</code>
|
||||||
*/
|
*/
|
||||||
public boolean allowPrimary() {
|
public boolean allowPrimary() {
|
||||||
|
@ -191,6 +194,11 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
||||||
}
|
}
|
||||||
it.remove();
|
it.remove();
|
||||||
routingNode.add(shardRouting);
|
routingNode.add(shardRouting);
|
||||||
|
if (shardRouting.primary()) {
|
||||||
|
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
|
||||||
|
// and we want to force allocate it (and create a new index for it)
|
||||||
|
allocation.routingNodes().addClearPostAllocationFlag(shardRouting.shardId());
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,17 @@
|
||||||
package org.elasticsearch.test.integration.cluster.allocation;
|
package org.elasticsearch.test.integration.cluster.allocation;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
|
||||||
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
|
import org.elasticsearch.common.io.FileSystemUtils;
|
||||||
import org.elasticsearch.common.logging.ESLogger;
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.gateway.Gateway;
|
import org.elasticsearch.gateway.Gateway;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
import org.elasticsearch.node.internal.InternalNode;
|
import org.elasticsearch.node.internal.InternalNode;
|
||||||
|
@ -35,6 +38,8 @@ import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||||
import org.testng.annotations.AfterMethod;
|
import org.testng.annotations.AfterMethod;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
@ -161,5 +166,36 @@ public class ClusterRerouteTests extends AbstractNodesTests {
|
||||||
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
||||||
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
|
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
|
||||||
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
|
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
|
||||||
|
|
||||||
|
client("node1").prepareIndex("test", "type", "1").setSource("field", "value").setRefresh(true).execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("--> closing all nodes");
|
||||||
|
File shardLocation = ((InternalNode) node("node1")).injector().getInstance(NodeEnvironment.class).shardLocations(new ShardId("test", 0))[0];
|
||||||
|
closeAllNodes();
|
||||||
|
|
||||||
|
logger.info("--> deleting the shard data");
|
||||||
|
FileSystemUtils.deleteRecursively(shardLocation);
|
||||||
|
|
||||||
|
logger.info("--> starting the first node back, will not allocate the shard since it has no data, but the index will be there");
|
||||||
|
startNode("node1", commonSettings);
|
||||||
|
// wait a bit for the cluster to realize that the shard is not there...
|
||||||
|
// TODO can we get around this? the cluster is RED, so what do we wait for?
|
||||||
|
Thread.sleep(300);
|
||||||
|
assertThat(client("node1").admin().cluster().prepareHealth().execute().actionGet().getStatus(), equalTo(ClusterHealthStatus.RED));
|
||||||
|
logger.info("--> explicitly allocate primary");
|
||||||
|
state = client("node1").admin().cluster().prepareReroute()
|
||||||
|
.add(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true))
|
||||||
|
.execute().actionGet().getState();
|
||||||
|
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
|
||||||
|
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
|
||||||
|
|
||||||
|
healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForYellowStatus().execute().actionGet();
|
||||||
|
assertThat(healthResponse.isTimedOut(), equalTo(false));
|
||||||
|
|
||||||
|
logger.info("--> get the state, verify shard 1 primary allocated");
|
||||||
|
state = client("node1").admin().cluster().prepareState().execute().actionGet().getState();
|
||||||
|
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
|
||||||
|
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue