Handle inefficiencies while fetching the delayed unassigned shards during cluster health (#588) (#730)

Signed-off-by: Meet Shah <meetshsh@gmail.com>

Co-authored-by: Meet Shah <48720201+meetshah777@users.noreply.github.com>
This commit is contained in:
Abbas Hussain 2021-05-21 02:41:49 +05:30 committed by GitHub
parent 3290932fe4
commit d330ca6559
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 59 additions and 7 deletions

View File

@ -59,6 +59,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
/**
* The {@link IndexRoutingTable} represents routing information for a single
@ -278,6 +279,14 @@ public class IndexRoutingTable extends AbstractDiffable<IndexRoutingTable> imple
return shards;
}
public int shardsMatchingPredicateCount(Predicate<ShardRouting> predicate) {
int count = 0;
for (IndexShardRoutingTable shardRoutingTable : this) {
count += shardRoutingTable.shardsMatchingPredicateCount(predicate);
}
return count;
}
/**
* Returns an unordered iterator over all active shards (including replicas).
*/

View File

@ -56,6 +56,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import static java.util.Collections.emptyMap;
@ -672,6 +673,16 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shards;
}
public int shardsMatchingPredicateCount(Predicate<ShardRouting> predicate) {
int count = 0;
for (ShardRouting shardEntry : this) {
if (predicate.test(shardEntry)) {
count++;
}
}
return count;
}
public static class Builder {
private ShardId shardId;

View File

@ -196,6 +196,14 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
return shards;
}
public int shardsMatchingPredicateCount(Predicate<ShardRouting> predicate) {
int count = 0;
for (IndexRoutingTable indexRoutingTable : this) {
count += indexRoutingTable.shardsMatchingPredicateCount(predicate);
}
return count;
}
/**
* All the shards (replicas) for all indices in this routing table.
*

View File

@ -58,6 +58,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
/**
* Holds additional information as to why the shard is in unassigned state.
@ -413,13 +414,8 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable {
* Returns the number of shards that are unassigned and currently being delayed.
*/
public static int getNumberOfDelayedUnassigned(ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.unassignedInfo().isDelayed()) {
count++;
}
}
return count;
Predicate<ShardRouting> predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed();
return state.routingTable().shardsMatchingPredicateCount(predicate);
}
/**

View File

@ -50,6 +50,7 @@ import org.junit.Before;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.function.Predicate;
import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.containsString;
@ -183,6 +184,33 @@ public class RoutingTableTests extends OpenSearchAllocationTestCase {
assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.STARTED).size(), is(this.totalNumberOfShards));
}
public void testShardsMatchingPredicateCount(){
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
Metadata metadata = Metadata.builder()
.put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build()).build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
clusterState = allocation.reroute(clusterState, "reroute");
Predicate<ShardRouting> predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed();
assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(0));
// starting primaries
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// starting replicas
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
clusterState = allocation.disassociateDeadNodes(clusterState, true, "reroute");
assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(2));
}
public void testActivePrimaryShardsGrouped() {
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0));
assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));