diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index dff5771d050..08dc3d1d709 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -19,8 +19,6 @@ package org.elasticsearch.cluster.routing; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -33,7 +31,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.node.ResponseCollectorService; import java.util.ArrayList; @@ -47,8 +44,6 @@ import java.util.stream.Collectors; public class OperationRouting { - private static final Logger logger = LogManager.getLogger(OperationRouting.class); - public static final Setting USE_ADAPTIVE_REPLICA_SELECTION_SETTING = Setting.boolSetting("cluster.routing.use_adaptive_replica_selection", true, Setting.Property.Dynamic, Setting.Property.NodeScope); @@ -129,7 +124,7 @@ public class OperationRouting { for (String r : effectiveRouting) { final int routingPartitionSize = indexMetaData.getRoutingPartitionSize(); for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) { - set.add(shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset))); + set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetaData, r, partitionOffset))); } } } else { @@ -146,15 +141,7 @@ public class OperationRouting { @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts) { if (preference == null || preference.isEmpty()) { - if (awarenessAttributes.isEmpty()) { - if (useAdaptiveReplicaSelection) { - return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); - } else { - return indexShard.activeInitializingShardsRandomIt(); - } - } else { - return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); - } + return shardRoutings(indexShard, nodes, collectorService, nodeCounts); } if (preference.charAt(0) == '_') { Preference preferenceType = Preference.parse(preference); @@ -181,15 +168,7 @@ public class OperationRouting { } // no more preference if (index == -1 || index == preference.length() - 1) { - if (awarenessAttributes.isEmpty()) { - if (useAdaptiveReplicaSelection) { - return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); - } else { - return indexShard.activeInitializingShardsRandomIt(); - } - } else { - return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); - } + return shardRoutings(indexShard, nodes, collectorService, nodeCounts); } else { // update the preference and continue preference = preference.substring(index + 1); @@ -232,12 +211,17 @@ public class OperationRouting { } } - private IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) { - IndexShardRoutingTable indexShard = indexRouting.shard(shardId); - if (indexShard == null) { - throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId)); + private ShardIterator shardRoutings(IndexShardRoutingTable indexShard, DiscoveryNodes nodes, + @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts) { + if (awarenessAttributes.isEmpty()) { + if (useAdaptiveReplicaSelection) { + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + } else { + return indexShard.activeInitializingShardsRandomIt(); + } + } else { + return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } - return indexShard; } protected IndexRoutingTable indexRoutingTable(ClusterState clusterState, String index) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java index 883b4c22f7f..0c3c7197261 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingChangesObserver.java @@ -59,11 +59,6 @@ public interface RoutingChangesObserver { */ void relocationSourceRemoved(ShardRouting removedReplicaRelocationSource); - /** - * Called on started primary shard after it has been promoted from replica to primary and is reinitialized due to shadow replicas. - */ - void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard); - /** * Called when started replica is promoted to primary. */ @@ -117,11 +112,6 @@ public interface RoutingChangesObserver { } - @Override - public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { - - } - @Override public void replicaPromoted(ShardRouting replicaShard) { @@ -190,13 +180,6 @@ public interface RoutingChangesObserver { } } - @Override - public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { - for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { - routingChangesObserver.startedPrimaryReinitialized(startedPrimaryShard, initializedShard); - } - } - @Override public void replicaPromoted(ShardRouting replicaShard) { for (RoutingChangesObserver routingChangesObserver : routingChangesObservers) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingException.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingException.java index 748fe3d9c4e..5cf0693f8b2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingException.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingException.java @@ -29,10 +29,6 @@ import java.io.IOException; */ public class RoutingException extends ElasticsearchException { - public RoutingException(String message) { - super(message); - } - public RoutingException(String message, Throwable cause) { super(message, cause); } @@ -40,4 +36,4 @@ public class RoutingException extends ElasticsearchException { public RoutingException(StreamInput in) throws IOException{ super(in); } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java index af617dccf67..4750476805d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingNodes.java @@ -577,13 +577,7 @@ public class RoutingNodes implements Iterable { if (failedShard.relocatingNodeId() == null) { if (failedShard.primary()) { // promote active replica to primary if active replica exists (only the case for shadow replicas) - ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); - if (activeReplica == null) { - moveToUnassigned(failedShard, unassignedInfo); - } else { - movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo); - promoteReplicaToPrimary(activeReplica, routingChangesObserver); - } + unassignPrimaryAndPromoteActiveReplicaIfExists(failedShard, unassignedInfo, routingChangesObserver); } else { // initializing shard that is not relocation target, just move to unassigned moveToUnassigned(failedShard, unassignedInfo); @@ -601,32 +595,36 @@ public class RoutingNodes implements Iterable { cancelRelocation(sourceShard); remove(failedShard); } - routingChangesObserver.shardFailed(failedShard, unassignedInfo); } else { assert failedShard.active(); if (failedShard.primary()) { // promote active replica to primary if active replica exists - ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); - if (activeReplica == null) { - moveToUnassigned(failedShard, unassignedInfo); - } else { - movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo); - promoteReplicaToPrimary(activeReplica, routingChangesObserver); - } + unassignPrimaryAndPromoteActiveReplicaIfExists(failedShard, unassignedInfo, routingChangesObserver); } else { - assert failedShard.primary() == false; if (failedShard.relocating()) { remove(failedShard); } else { moveToUnassigned(failedShard, unassignedInfo); } } - routingChangesObserver.shardFailed(failedShard, unassignedInfo); } + routingChangesObserver.shardFailed(failedShard, unassignedInfo); assert node(failedShard.currentNodeId()).getByShardId(failedShard.shardId()) == null : "failedShard " + failedShard + " was matched but wasn't removed"; } + private void unassignPrimaryAndPromoteActiveReplicaIfExists(ShardRouting failedShard, UnassignedInfo unassignedInfo, + RoutingChangesObserver routingChangesObserver) { + assert failedShard.primary(); + ShardRouting activeReplica = activeReplicaWithHighestVersion(failedShard.shardId()); + if (activeReplica == null) { + moveToUnassigned(failedShard, unassignedInfo); + } else { + movePrimaryToUnassignedAndDemoteToReplica(failedShard, unassignedInfo); + promoteReplicaToPrimary(activeReplica, routingChangesObserver); + } + } + private void promoteReplicaToPrimary(ShardRouting activeReplica, RoutingChangesObserver routingChangesObserver) { // if the activeReplica was relocating before this call to failShard, its relocation was cancelled earlier when we // failed initializing replica shards (and moved replica relocation source back to started) @@ -1168,10 +1166,6 @@ public class RoutingNodes implements Iterable { private int incoming = 0; private int outgoing = 0; - int getTotal() { - return incoming + outgoing; - } - void addOutgoing(int howMany) { assert outgoing + howMany >= 0 : outgoing + howMany+ " must be >= 0"; outgoing += howMany; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java index 0d5ee132ffa..a6f7d58ce85 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java @@ -45,7 +45,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.function.Predicate; /** @@ -63,11 +62,27 @@ public class RoutingTable implements Iterable, Diffable indicesRouting; - RoutingTable(long version, ImmutableOpenMap indicesRouting) { + private RoutingTable(long version, ImmutableOpenMap indicesRouting) { this.version = version; this.indicesRouting = indicesRouting; } + /** + * Get's the {@link IndexShardRoutingTable} for the given shard id from the given {@link IndexRoutingTable} + * or throws a {@link ShardNotFoundException} if no shard by the given id is found in the IndexRoutingTable. + * + * @param indexRouting IndexRoutingTable + * @param shardId ShardId + * @return IndexShardRoutingTable + */ + public static IndexShardRoutingTable shardRoutingTable(IndexRoutingTable indexRouting, int shardId) { + IndexShardRoutingTable indexShard = indexRouting.shard(shardId); + if (indexShard == null) { + throw new ShardNotFoundException(new ShardId(indexRouting.getIndex(), shardId)); + } + return indexShard; + } + /** * Returns the version of the {@link RoutingTable}. * @@ -118,11 +133,7 @@ public class RoutingTable implements Iterable, Diffable, Diffable Optional.ofNullable(irt.shard(shardId.getId()))) - .orElse(null); - } - @Nullable public ShardRouting getByAllocationId(ShardId shardId, String allocationId) { - IndexShardRoutingTable shardRoutingTable = shardRoutingTableOrNull(shardId); - if (shardRoutingTable == null) { + final IndexRoutingTable indexRoutingTable = index(shardId.getIndexName()); + if (indexRoutingTable == null) { return null; } - return shardRoutingTable.getByAllocationId(allocationId); + final IndexShardRoutingTable shardRoutingTable = indexRoutingTable.shard(shardId.getId()); + return shardRoutingTable == null ? null : shardRoutingTable.getByAllocationId(allocationId); } - public boolean validate(MetaData metaData) { for (IndexRoutingTable indexRoutingTable : this) { if (indexRoutingTable.validate(metaData) == false) { @@ -213,42 +217,29 @@ public class RoutingTable implements Iterable, Diffableextra shard iterator will be added for relocating shards. The extra - * iterator contains a single ShardRouting pointing at the relocating target */ - public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty, boolean includeRelocationTargets) { - return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, includeRelocationTargets, ACTIVE_PREDICATE); - } - - public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) { - return allAssignedShardsGrouped(indices, includeEmpty, false); + public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty) { + return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ACTIVE_PREDICATE); } /** * Return GroupShardsIterator where each assigned shard routing has it's own shard iterator. * - * @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well - * @param includeRelocationTargets if true, an extra shard iterator will be added for relocating shards. The extra - * iterator contains a single ShardRouting pointing at the relocating target + * @param includeEmpty if true, a shard iterator will be added for non-assigned shards as well */ - public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty, - boolean includeRelocationTargets) { - return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, includeRelocationTargets, ASSIGNED_PREDICATE); + public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) { + return allSatisfyingPredicateShardsGrouped(indices, includeEmpty, ASSIGNED_PREDICATE); } private static Predicate ACTIVE_PREDICATE = ShardRouting::active; private static Predicate ASSIGNED_PREDICATE = ShardRouting::assignedToNode; private GroupShardsIterator allSatisfyingPredicateShardsGrouped(String[] indices, boolean includeEmpty, - boolean includeRelocationTargets, Predicate predicate) { + Predicate predicate) { // use list here since we need to maintain identity across shards ArrayList set = new ArrayList<>(); for (String index : indices) { @@ -261,10 +252,6 @@ public class RoutingTable implements Iterable, Diffable, Diffable shardRoutingEntries = Iterables.concat(routingNodes.unassigned(), routingNodes.unassigned().ignored()); for (ShardRouting shardRoutingEntry : shardRoutingEntries) { - Index index = shardRoutingEntry.index(); - IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index.getName()); - if (indexBuilder == null) { - indexBuilder = new IndexRoutingTable.Builder(index); - indexRoutingTableBuilders.put(index.getName(), indexBuilder); - } - indexBuilder.addShard(shardRoutingEntry); + addShard(indexRoutingTableBuilders, shardRoutingEntry); } for (IndexRoutingTable.Builder indexBuilder : indexRoutingTableBuilders.values()) { @@ -461,6 +435,17 @@ public class RoutingTable implements Iterable, Diffable indexRoutingTableBuilders, + final ShardRouting shardRoutingEntry) { + Index index = shardRoutingEntry.index(); + IndexRoutingTable.Builder indexBuilder = indexRoutingTableBuilders.get(index.getName()); + if (indexBuilder == null) { + indexBuilder = new IndexRoutingTable.Builder(index); + indexRoutingTableBuilders.put(index.getName(), indexBuilder); + } + indexBuilder.addShard(shardRoutingEntry); + } + /** * Update the number of replicas for the specified indices. * diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index f8afbeb4493..3cad02367fd 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -161,11 +161,6 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable { this.id = id; } - // package private for testing - byte getId() { - return id; - } - @Override public void writeTo(StreamOutput out) throws IOException { out.writeByte(id); @@ -281,10 +276,6 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable { lastAllocationStatus.writeTo(out); } - public UnassignedInfo readFrom(StreamInput in) throws IOException { - return new UnassignedInfo(in); - } - /** * Returns the number of previously failed allocations of this shard. */ diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index e0be712a230..059b13c5a1b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -185,12 +185,7 @@ public class RoutingAllocation { if (ignoredShardToNodes == null) { ignoredShardToNodes = new HashMap<>(); } - Set nodes = ignoredShardToNodes.get(shardId); - if (nodes == null) { - nodes = new HashSet<>(); - ignoredShardToNodes.put(shardId, nodes); - } - nodes.add(nodeId); + ignoredShardToNodes.computeIfAbsent(shardId, k -> new HashSet<>()).add(nodeId); } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java index 3e465e42b44..56308f71539 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingNodesChangedObserver.java @@ -83,13 +83,6 @@ public class RoutingNodesChangedObserver implements RoutingChangesObserver { setChanged(); } - @Override - public void startedPrimaryReinitialized(ShardRouting startedPrimaryShard, ShardRouting initializedShard) { - assert startedPrimaryShard.primary() && startedPrimaryShard.started() : "expected started primary shard " + startedPrimaryShard; - assert initializedShard.primary() && initializedShard.initializing(): "expected initializing primary shard " + initializedShard; - setChanged(); - } - @Override public void replicaPromoted(ShardRouting replicaShard) { assert replicaShard.started() && replicaShard.primary() == false : "expected started replica shard " + replicaShard; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationDecision.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationDecision.java index 390e4510f0f..d577c657a79 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationDecision.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardAllocationDecision.java @@ -22,7 +22,6 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContent.Params; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ad5db788a80..6af6e6696e0 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -759,7 +759,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { final PriorityComparator secondaryComparator = PriorityComparator.getAllocationComparator(allocation); final Comparator comparator = (o1, o2) -> { if (o1.primary() ^ o2.primary()) { - return o1.primary() ? -1 : o2.primary() ? 1 : 0; + return o1.primary() ? -1 : 1; } final int indexCmp; if ((indexCmp = o1.getIndexName().compareTo(o2.getIndexName())) == 0) { @@ -929,7 +929,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) - || (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)); + || (nodeHigh > repId && minNodeHigh < repId)); } else { updateMinNode = currentDecision.type() == Type.YES; } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java index ed5df30c54b..dd6ab760e9f 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommand.java @@ -24,9 +24,7 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.io.stream.NamedWriteable; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentParser; -import java.io.IOException; import java.util.Optional; /** @@ -35,15 +33,6 @@ import java.util.Optional; * Commands are registered in {@link NetworkModule}. */ public interface AllocationCommand extends NamedWriteable, ToXContentObject { - interface Parser { - /** - * Reads an {@link AllocationCommand} of type T from a {@link XContentParser}. - * @param parser {@link XContentParser} to use - * @return {@link AllocationCommand} read - * @throws IOException if something happens during reading - */ - T fromXContent(XContentParser parser) throws IOException; - } /** * Get the name of the command