Replace primaryPostAllocated flag and use UnassignedInfo
There is no need to maintain additional state as to if a primary was allocated post api creation on the index routing table, we hold all this information already in the UnassignedInfo class. closes #12374
This commit is contained in:
parent
d3e454780f
commit
cd7096dfc2
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
|||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -92,8 +93,10 @@ public class MetaDataIndexStateService extends AbstractComponent {
|
|||
if (indexMetaData.state() != IndexMetaData.State.CLOSE) {
|
||||
IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
|
||||
for (IndexShardRoutingTable shard : indexRoutingTable) {
|
||||
if (!shard.primaryAllocatedPostApi()) {
|
||||
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
|
||||
for (ShardRouting shardRouting : shard) {
|
||||
if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
|
||||
throw new IndexPrimaryShardNotAllocatedException(new Index(index));
|
||||
}
|
||||
}
|
||||
}
|
||||
indicesToClose.add(index);
|
||||
|
|
|
@ -383,28 +383,28 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
* Initializes a new empty index, as if it was created from an API.
|
||||
*/
|
||||
public Builder initializeAsNew(IndexMetaData indexMetaData) {
|
||||
return initializeEmpty(indexMetaData, true, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
|
||||
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a new empty index, as if it was created from an API.
|
||||
*/
|
||||
public Builder initializeAsRecovery(IndexMetaData indexMetaData) {
|
||||
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a new index caused by dangling index imported.
|
||||
*/
|
||||
public Builder initializeAsFromDangling(IndexMetaData indexMetaData) {
|
||||
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED, null));
|
||||
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.DANGLING_INDEX_IMPORTED, null));
|
||||
}
|
||||
|
||||
/**
|
||||
* Initializes a new empty index, as as a result of opening a closed index.
|
||||
*/
|
||||
public Builder initializeAsFromCloseToOpen(IndexMetaData indexMetaData) {
|
||||
return initializeEmpty(indexMetaData, false, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
|
||||
return initializeEmpty(indexMetaData, new UnassignedInfo(UnassignedInfo.Reason.INDEX_REOPENED, null));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -429,7 +429,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
|
||||
}
|
||||
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId));
|
||||
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
|
||||
if (asNew && ignoreShards.contains(shardId)) {
|
||||
// This shards wasn't completely snapshotted - restore it as new shard
|
||||
|
@ -446,12 +446,12 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
/**
|
||||
* Initializes a new empty index, with an option to control if its from an API or not.
|
||||
*/
|
||||
private Builder initializeEmpty(IndexMetaData indexMetaData, boolean asNew, UnassignedInfo unassignedInfo) {
|
||||
private Builder initializeEmpty(IndexMetaData indexMetaData, UnassignedInfo unassignedInfo) {
|
||||
if (!shards.isEmpty()) {
|
||||
throw new IllegalStateException("trying to initialize an index with fresh shards, but already has shards created");
|
||||
}
|
||||
for (int shardId = 0; shardId < indexMetaData.numberOfShards(); shardId++) {
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId), asNew ? false : true);
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(new ShardId(indexMetaData.index(), shardId));
|
||||
for (int i = 0; i <= indexMetaData.numberOfReplicas(); i++) {
|
||||
indexShardRoutingBuilder.addShard(ShardRouting.newUnassigned(index, shardId, null, i == 0, unassignedInfo));
|
||||
}
|
||||
|
@ -481,7 +481,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
return this;
|
||||
}
|
||||
// re-add all the current ones
|
||||
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId(), indexShard.primaryAllocatedPostApi());
|
||||
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(indexShard.shardId());
|
||||
for (ShardRouting shardRouting : indexShard) {
|
||||
builder.addShard(new ShardRouting(shardRouting));
|
||||
}
|
||||
|
@ -513,16 +513,6 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the post allocation flag for the specified shard
|
||||
*/
|
||||
public Builder clearPostAllocationFlag(ShardId shardId) {
|
||||
assert this.index.equals(shardId.index().name());
|
||||
IndexShardRoutingTable indexShard = shards.get(shardId.id());
|
||||
shards.put(indexShard.shardId().id(), new IndexShardRoutingTable(indexShard.shardId(), indexShard.shards(), false));
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a new shard routing (makes a copy of it), with reference data used from the index shard routing table
|
||||
* if it needs to be created.
|
||||
|
@ -530,7 +520,7 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
|
|||
public Builder addShard(IndexShardRoutingTable refData, ShardRouting shard) {
|
||||
IndexShardRoutingTable indexShard = shards.get(shard.id());
|
||||
if (indexShard == null) {
|
||||
indexShard = new IndexShardRoutingTable.Builder(refData.shardId(), refData.primaryAllocatedPostApi()).addShard(new ShardRouting(shard)).build();
|
||||
indexShard = new IndexShardRoutingTable.Builder(refData.shardId()).addShard(new ShardRouting(shard)).build();
|
||||
} else {
|
||||
indexShard = new IndexShardRoutingTable.Builder(indexShard).addShard(new ShardRouting(shard)).build();
|
||||
}
|
||||
|
|
|
@ -62,13 +62,10 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
*/
|
||||
final ImmutableList<ShardRouting> allInitializingShards;
|
||||
|
||||
final boolean primaryAllocatedPostApi;
|
||||
|
||||
IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards, boolean primaryAllocatedPostApi) {
|
||||
IndexShardRoutingTable(ShardId shardId, List<ShardRouting> shards) {
|
||||
this.shardId = shardId;
|
||||
this.shuffler = new RotationShardShuffler(ThreadLocalRandom.current().nextInt());
|
||||
this.shards = ImmutableList.copyOf(shards);
|
||||
this.primaryAllocatedPostApi = primaryAllocatedPostApi;
|
||||
|
||||
ShardRouting primary = null;
|
||||
ImmutableList.Builder<ShardRouting> replicas = ImmutableList.builder();
|
||||
|
@ -144,15 +141,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
shardRoutings.add(new ShardRouting(shards.get(i), highestVersion));
|
||||
}
|
||||
}
|
||||
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shardRoutings), primaryAllocatedPostApi);
|
||||
}
|
||||
|
||||
/**
|
||||
* Has this shard group primary shard been allocated post API creation. Will be set to
|
||||
* <code>true</code> if it was created because of recovery action.
|
||||
*/
|
||||
public boolean primaryAllocatedPostApi() {
|
||||
return primaryAllocatedPostApi;
|
||||
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shardRoutings));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -434,7 +423,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
IndexShardRoutingTable that = (IndexShardRoutingTable) o;
|
||||
|
||||
if (primaryAllocatedPostApi != that.primaryAllocatedPostApi) return false;
|
||||
if (!shardId.equals(that.shardId)) return false;
|
||||
if (!shards.equals(that.shards)) return false;
|
||||
|
||||
|
@ -445,7 +433,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
public int hashCode() {
|
||||
int result = shardId.hashCode();
|
||||
result = 31 * result + shards.hashCode();
|
||||
result = 31 * result + (primaryAllocatedPostApi ? 1 : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -594,21 +581,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
public static class Builder {
|
||||
|
||||
private ShardId shardId;
|
||||
|
||||
private final List<ShardRouting> shards;
|
||||
|
||||
private boolean primaryAllocatedPostApi;
|
||||
|
||||
public Builder(IndexShardRoutingTable indexShard) {
|
||||
this.shardId = indexShard.shardId;
|
||||
this.shards = newArrayList(indexShard.shards);
|
||||
this.primaryAllocatedPostApi = indexShard.primaryAllocatedPostApi();
|
||||
}
|
||||
|
||||
public Builder(ShardId shardId, boolean primaryAllocatedPostApi) {
|
||||
public Builder(ShardId shardId) {
|
||||
this.shardId = shardId;
|
||||
this.shards = newArrayList();
|
||||
this.primaryAllocatedPostApi = primaryAllocatedPostApi;
|
||||
}
|
||||
|
||||
public Builder addShard(ShardRouting shardEntry) {
|
||||
|
@ -630,15 +612,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
}
|
||||
|
||||
public IndexShardRoutingTable build() {
|
||||
// we can automatically set allocatedPostApi to true if the primary is active
|
||||
if (!primaryAllocatedPostApi) {
|
||||
for (ShardRouting shardRouting : shards) {
|
||||
if (shardRouting.primary() && shardRouting.active()) {
|
||||
primaryAllocatedPostApi = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards), primaryAllocatedPostApi);
|
||||
return new IndexShardRoutingTable(shardId, ImmutableList.copyOf(shards));
|
||||
}
|
||||
|
||||
public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
|
||||
|
@ -648,8 +622,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
public static IndexShardRoutingTable readFromThin(StreamInput in, String index) throws IOException {
|
||||
int iShardId = in.readVInt();
|
||||
boolean allocatedPostApi = in.readBoolean();
|
||||
Builder builder = new Builder(new ShardId(index, iShardId), allocatedPostApi);
|
||||
Builder builder = new Builder(new ShardId(index, iShardId));
|
||||
|
||||
int size = in.readVInt();
|
||||
for (int i = 0; i < size; i++) {
|
||||
|
@ -667,7 +640,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
public static void writeToThin(IndexShardRoutingTable indexShard, StreamOutput out) throws IOException {
|
||||
out.writeVInt(indexShard.shardId.id());
|
||||
out.writeBoolean(indexShard.primaryAllocatedPostApi());
|
||||
|
||||
out.writeVInt(indexShard.shards.size());
|
||||
for (ShardRouting entry : indexShard) {
|
||||
|
|
|
@ -64,8 +64,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
|
||||
private int relocatingShards = 0;
|
||||
|
||||
private Set<ShardId> clearPostAllocationFlag;
|
||||
|
||||
private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();
|
||||
|
||||
public RoutingNodes(ClusterState clusterState) {
|
||||
|
@ -191,25 +189,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
return new RoutingNodesIterator(nodesToShards.values().iterator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears the post allocation flag for the provided shard id. NOTE: this should be used cautiously
|
||||
* since it will lead to data loss of the primary shard is not allocated, as it will allocate
|
||||
* the primary shard on a node and *not* expect it to have an existing valid index there.
|
||||
*/
|
||||
public void addClearPostAllocationFlag(ShardId shardId) {
|
||||
if (clearPostAllocationFlag == null) {
|
||||
clearPostAllocationFlag = Sets.newHashSet();
|
||||
}
|
||||
clearPostAllocationFlag.add(shardId);
|
||||
}
|
||||
|
||||
public Iterable<ShardId> getShardsToClearPostAllocationFlag() {
|
||||
if (clearPostAllocationFlag == null) {
|
||||
return ImmutableSet.of();
|
||||
}
|
||||
return clearPostAllocationFlag;
|
||||
}
|
||||
|
||||
public RoutingNode node(String nodeId) {
|
||||
return nodesToShards.get(nodeId);
|
||||
}
|
||||
|
|
|
@ -373,13 +373,6 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
|
|||
indexBuilder.addShard(refData, shardRoutingEntry);
|
||||
}
|
||||
|
||||
for (ShardId shardId : routingNodes.getShardsToClearPostAllocationFlag()) {
|
||||
IndexRoutingTable.Builder indexRoutingBuilder = indexRoutingTableBuilders.get(shardId.index().name());
|
||||
if (indexRoutingBuilder != null) {
|
||||
indexRoutingBuilder.clearPostAllocationFlag(shardId);
|
||||
}
|
||||
}
|
||||
|
||||
for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) {
|
||||
add(indexBuilder);
|
||||
}
|
||||
|
|
|
@ -257,6 +257,21 @@ public final class ShardRouting implements Streamable, ToXContent {
|
|||
return shardIdentifier;
|
||||
}
|
||||
|
||||
public boolean allocatedPostIndexCreate() {
|
||||
if (active()) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// unassigned info is only cleared when a shard moves to started, so
|
||||
// for unassigned and initializing (we checked for active() before),
|
||||
// we can safely assume it is there
|
||||
if (unassignedInfo.getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* A shard iterator with just this shard in it.
|
||||
*/
|
||||
|
@ -362,6 +377,11 @@ public final class ShardRouting implements Streamable, ToXContent {
|
|||
writeToThin(out);
|
||||
}
|
||||
|
||||
public void updateUnassignedInfo(UnassignedInfo unassignedInfo) {
|
||||
ensureNotFrozen();
|
||||
assert this.unassignedInfo != null : "can only update unassign info if they are already set";
|
||||
this.unassignedInfo = unassignedInfo;
|
||||
}
|
||||
|
||||
// package private mutators start here
|
||||
|
||||
|
@ -431,6 +451,7 @@ public final class ShardRouting implements Streamable, ToXContent {
|
|||
version++;
|
||||
state = ShardRoutingState.INITIALIZING;
|
||||
allocationId = AllocationId.newInitializing();
|
||||
this.unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.REINITIALIZED, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -91,7 +91,11 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
|
|||
/**
|
||||
* Unassigned as a result of explicit cancel reroute command.
|
||||
*/
|
||||
REROUTE_CANCELLED;
|
||||
REROUTE_CANCELLED,
|
||||
/**
|
||||
* When a shard moves from started back to initializing, for example, during shadow replica
|
||||
*/
|
||||
REINITIALIZED;
|
||||
}
|
||||
|
||||
private final Reason reason;
|
||||
|
|
|
@ -21,9 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.command;
|
|||
|
||||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
||||
|
@ -35,7 +36,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
|
||||
|
@ -221,15 +221,17 @@ public class AllocateAllocationCommand implements AllocationCommand {
|
|||
}
|
||||
// go over and remove it from the unassigned
|
||||
for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
|
||||
if (it.next() != shardRouting) {
|
||||
ShardRouting unassigned = it.next();
|
||||
if (unassigned != shardRouting) {
|
||||
continue;
|
||||
}
|
||||
it.initialize(routingNode.nodeId());
|
||||
if (shardRouting.primary()) {
|
||||
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
|
||||
// and we want to force allocate it (and create a new index for it)
|
||||
routingNodes.addClearPostAllocationFlag(shardRouting.shardId());
|
||||
// if we force allocation of a primary, we need to move the unassigned info back to treat it as if
|
||||
// it was index creation
|
||||
if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
|
||||
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
|
||||
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
|
||||
}
|
||||
it.initialize(routingNode.nodeId());
|
||||
break;
|
||||
}
|
||||
return new RerouteExplanation(this, decision);
|
||||
|
|
|
@ -109,7 +109,7 @@ public class DisableAllocationDecider extends AllocationDecider {
|
|||
return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
|
||||
}
|
||||
Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).settings();
|
||||
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
|
||||
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
|
||||
// if its primary, and it hasn't been allocated post API (meaning its a "fresh newly created shard"), only disable allocation
|
||||
// on a special disable allocation flag
|
||||
if (indexSettings.getAsBoolean(INDEX_ROUTING_ALLOCATION_DISABLE_NEW_ALLOCATION, disableNewAllocation)) {
|
||||
|
|
|
@ -311,7 +311,7 @@ public class DiskThresholdDecider extends AllocationDecider {
|
|||
}
|
||||
|
||||
// a flag for whether the primary shard has been previously allocated
|
||||
boolean primaryHasBeenAllocated = allocation.routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi();
|
||||
boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate();
|
||||
|
||||
// checks for exact byte comparisons
|
||||
if (freeBytes < freeBytesThresholdLow.bytes()) {
|
||||
|
|
|
@ -98,7 +98,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
|
|||
case NONE:
|
||||
return allocation.decision(Decision.NO, NAME, "no allocations are allowed");
|
||||
case NEW_PRIMARIES:
|
||||
if (shardRouting.primary() && !allocation.routingNodes().routingTable().index(shardRouting.index()).shard(shardRouting.id()).primaryAllocatedPostApi()) {
|
||||
if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
|
||||
return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
|
||||
} else {
|
||||
return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.lucene.util.CollectionUtil;
|
|||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
|
@ -62,7 +61,7 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
while (unassignedIterator.hasNext()) {
|
||||
ShardRouting shard = unassignedIterator.next();
|
||||
|
||||
if (needToFindPrimaryCopy(shard, routingNodes.routingTable().index(shard.index()).shard(shard.id())) == false) {
|
||||
if (needToFindPrimaryCopy(shard) == false) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -113,13 +112,13 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
|
|||
/**
|
||||
* Does the shard need to find a primary copy?
|
||||
*/
|
||||
boolean needToFindPrimaryCopy(ShardRouting shard, IndexShardRoutingTable indexShardRoutingTable) {
|
||||
boolean needToFindPrimaryCopy(ShardRouting shard) {
|
||||
if (shard.primary() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// this is an API allocation, ignore since we know there is no data...
|
||||
if (indexShardRoutingTable.primaryAllocatedPostApi() == false) {
|
||||
if (shard.allocatedPostIndexCreate() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,13 +24,11 @@ import com.google.common.base.Preconditions;
|
|||
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.search.QueryCachingPolicy;
|
||||
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
import org.elasticsearch.ElasticsearchCorruptionException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
|
@ -39,7 +37,6 @@ import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RestoreSource;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
|
@ -1039,10 +1036,11 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
return path;
|
||||
}
|
||||
|
||||
public void recoverFromStore(IndexShardRoutingTable shardRoutingTable, StoreRecoveryService.RecoveryListener recoveryListener) {
|
||||
public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
|
||||
// we are the first primary, recover from the gateway
|
||||
// if its post api allocation, the index should exists
|
||||
final boolean shouldExist = shardRoutingTable.primaryAllocatedPostApi();
|
||||
assert shard.primary() : "recover from store only makes sense if the shard is a primary shard";
|
||||
final boolean shouldExist = shard.allocatedPostIndexCreate();
|
||||
storeRecoveryService.recover(this, shouldExist, recoveryListener);
|
||||
}
|
||||
|
||||
|
|
|
@ -676,8 +676,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
|
|||
handleRecoveryFailure(indexService, shardRouting, true, e);
|
||||
}
|
||||
} else {
|
||||
final IndexShardRoutingTable indexShardRouting = routingTable.index(shardRouting.index()).shard(shardRouting.id());
|
||||
indexService.shard(shardId).recoverFromStore(indexShardRouting, new StoreRecoveryService.RecoveryListener() {
|
||||
indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() {
|
||||
@Override
|
||||
public void onRecoveryDone() {
|
||||
shardStateAction.shardStarted(shardRouting, indexMetaData.getIndexUUID(), "after recovery from store");
|
||||
|
|
|
@ -208,7 +208,7 @@ public class ShardReplicationTests extends ElasticsearchTestCase {
|
|||
|
||||
RoutingTable.Builder routing = new RoutingTable.Builder();
|
||||
routing.addAsNew(indexMetaData);
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId, false);
|
||||
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
|
||||
|
||||
String primaryNode = null;
|
||||
String relocatingNode = null;
|
||||
|
|
|
@ -136,7 +136,7 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
|||
}
|
||||
|
||||
private IndexShardRoutingTable genShardRoutingTable(String index, int shardId, int replicas, ShardCounter counter) {
|
||||
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(new ShardId(index, shardId), true);
|
||||
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(new ShardId(index, shardId));
|
||||
ShardRouting shardRouting = genShardRouting(index, shardId, true);
|
||||
counter.update(shardRouting);
|
||||
builder.addShard(shardRouting);
|
||||
|
|
|
@ -218,7 +218,7 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
|
|||
int shardCount = randomInt(10);
|
||||
|
||||
for (int i = 0; i < shardCount; i++) {
|
||||
IndexShardRoutingTable.Builder indexShard = new IndexShardRoutingTable.Builder(new ShardId(index, i), randomBoolean());
|
||||
IndexShardRoutingTable.Builder indexShard = new IndexShardRoutingTable.Builder(new ShardId(index, i));
|
||||
int replicaCount = randomIntBetween(1, 10);
|
||||
for (int j = 0; j < replicaCount; j++) {
|
||||
indexShard.addShard(
|
||||
|
|
|
@ -59,7 +59,8 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
|
|||
UnassignedInfo.Reason.REPLICA_ADDED,
|
||||
UnassignedInfo.Reason.ALLOCATION_FAILED,
|
||||
UnassignedInfo.Reason.NODE_LEFT,
|
||||
UnassignedInfo.Reason.REROUTE_CANCELLED};
|
||||
UnassignedInfo.Reason.REROUTE_CANCELLED,
|
||||
UnassignedInfo.Reason.REINITIALIZED};
|
||||
for (int i = 0; i < order.length; i++) {
|
||||
assertThat(order[i].ordinal(), equalTo(i));
|
||||
}
|
||||
|
|
|
@ -1,69 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.cluster.routing.allocation;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class AllocatePostApiFlagTests extends ElasticsearchAllocationTestCase {
|
||||
|
||||
private final ESLogger logger = Loggers.getLogger(AllocatePostApiFlagTests.class);
|
||||
|
||||
@Test
|
||||
public void simpleFlagTests() {
|
||||
AllocationService allocation = createAllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
|
||||
|
||||
logger.info("creating an index with 1 shard, no replica");
|
||||
MetaData metaData = MetaData.builder()
|
||||
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
|
||||
.build();
|
||||
RoutingTable routingTable = RoutingTable.builder()
|
||||
.addAsNew(metaData.index("test"))
|
||||
.build();
|
||||
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
|
||||
assertThat(clusterState.routingTable().index("test").shard(0).primaryAllocatedPostApi(), equalTo(false));
|
||||
|
||||
logger.info("adding two nodes and performing rerouting");
|
||||
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
|
||||
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.routingTable().index("test").shard(0).primaryAllocatedPostApi(), equalTo(false));
|
||||
|
||||
logger.info("start primary shard");
|
||||
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
|
||||
clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
|
||||
assertThat(clusterState.routingTable().index("test").shard(0).primaryAllocatedPostApi(), equalTo(true));
|
||||
}
|
||||
}
|
|
@ -96,7 +96,7 @@ public abstract class CatAllocationTestBase extends ElasticsearchAllocationTestC
|
|||
IndexRoutingTable.Builder tableBuilder = new IndexRoutingTable.Builder(idx.name).initializeAsRecovery(idxMeta);
|
||||
Map<Integer, IndexShardRoutingTable> shardIdToRouting = new HashMap<>();
|
||||
for (ShardRouting r : idx.routing) {
|
||||
IndexShardRoutingTable refData = new IndexShardRoutingTable.Builder(new ShardId(idx.name, r.id()), true).addShard(r).build();
|
||||
IndexShardRoutingTable refData = new IndexShardRoutingTable.Builder(new ShardId(idx.name, r.id())).addShard(r).build();
|
||||
if (shardIdToRouting.containsKey(r.getId())) {
|
||||
refData = new IndexShardRoutingTable.Builder(shardIdToRouting.get(r.getId())).addShard(r).build();
|
||||
}
|
||||
|
|
|
@ -54,9 +54,9 @@ public class StartedShardsRoutingTests extends ElasticsearchAllocationTestCase {
|
|||
final ShardRouting startedShard = TestShardRouting.newShardRouting("test", 1, "node2", randomBoolean(), ShardRoutingState.STARTED, 1);
|
||||
final ShardRouting relocatingShard = TestShardRouting.newShardRouting("test", 2, "node1", "node2", randomBoolean(), ShardRoutingState.RELOCATING, 1);
|
||||
stateBuilder.routingTable(RoutingTable.builder().add(IndexRoutingTable.builder("test")
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId(), true).addShard(initShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId(), true).addShard(startedShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId(), true).addShard(relocatingShard).build())));
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(initShard.shardId()).addShard(initShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(startedShard.shardId()).addShard(startedShard).build())
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(relocatingShard.shardId()).addShard(relocatingShard).build())));
|
||||
|
||||
ClusterState state = stateBuilder.build();
|
||||
|
||||
|
|
|
@ -834,11 +834,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
|||
RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||
RoutingTable.Builder builder = RoutingTable.builder().add(
|
||||
IndexRoutingTable.builder("test")
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0))
|
||||
.addShard(firstRouting)
|
||||
.build()
|
||||
)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1))
|
||||
.addShard(secondRouting)
|
||||
.build()
|
||||
)
|
||||
|
@ -854,11 +854,11 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
|||
firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||
builder = RoutingTable.builder().add(
|
||||
IndexRoutingTable.builder("test")
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0), false)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 0))
|
||||
.addShard(firstRouting)
|
||||
.build()
|
||||
)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1), false)
|
||||
.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId("test", 1))
|
||||
.addShard(secondRouting)
|
||||
.build()
|
||||
)
|
||||
|
|
|
@ -66,7 +66,13 @@ public class PrimaryShardAllocatorTests extends ElasticsearchAllocationTestCase
|
|||
@Test
|
||||
public void testNoProcessReplica() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
|
||||
assertThat(testAllocator.needToFindPrimaryCopy(shard, null), equalTo(false));
|
||||
assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNoProcessPrimayNotAllcoatedBefore() {
|
||||
ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, true, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
|
||||
assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -248,7 +248,7 @@ public class IndicesStoreIntegrationTests extends ElasticsearchIntegrationTest {
|
|||
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder("test");
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
indexRoutingTableBuilder.addIndexShard(
|
||||
new IndexShardRoutingTable.Builder(new ShardId("test", i), false)
|
||||
new IndexShardRoutingTable.Builder(new ShardId("test", i))
|
||||
.addShard(TestShardRouting.newShardRouting("test", i, masterId, true, ShardRoutingState.STARTED, shardVersions[shardIds[i]]))
|
||||
.build()
|
||||
);
|
||||
|
|
|
@ -70,7 +70,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
|
||||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
|
||||
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
|
||||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
int unStartedShard = randomInt(numReplicas);
|
||||
|
@ -111,7 +111,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT)));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
int localShardId = randomInt(numShards - 1);
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
String nodeId = i == localShardId ? localNode.getId() : randomBoolean() ? "abc" : "xyz";
|
||||
|
@ -134,7 +134,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
String relocatingNodeId = randomBoolean() ? null : "def";
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", relocatingNodeId, true, ShardRoutingState.STARTED, 0));
|
||||
|
@ -157,7 +157,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.id()).put(localNode).put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), nodeVersion)));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", null, true, ShardRoutingState.STARTED, 0));
|
||||
for (int j = 0; j < numReplicas; j++) {
|
||||
|
@ -183,7 +183,7 @@ public class IndicesStoreTests extends ElasticsearchTestCase {
|
|||
.put(new DiscoveryNode("xyz", new LocalTransportAddress("xyz"), Version.CURRENT))
|
||||
.put(new DiscoveryNode("def", new LocalTransportAddress("def"), nodeVersion) // <-- only set relocating, since we're testing that in this test
|
||||
));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1), false);
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", 1));
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", "def", true, ShardRoutingState.STARTED, 0));
|
||||
for (int j = 0; j < numReplicas; j++) {
|
||||
|
|
Loading…
Reference in New Issue