Stabilizing org.opensearch.cluster.routing.MovePrimaryFirstTests.test… (#2048)

* Stabilizing org.opensearch.cluster.routing.MovePrimaryFirstTests.testClusterGreenAfterPartialRelocation

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Removing unused import

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>

* Making code more readable

Signed-off-by: Ankit Jain <jain.ankitk@gmail.com>
This commit is contained in:
Ankit Jain 2022-02-08 08:40:07 +05:30 committed by GitHub
parent b9420d8f70
commit 343b82fe24
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 23 additions and 26 deletions

View File

@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
@ -84,7 +85,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
return this.shardTuple.v2().get(shardId);
}
public ShardRouting add(ShardRouting shardRouting) {
public ShardRouting put(ShardRouting shardRouting) {
return put(shardRouting.shardId(), shardRouting);
}
@ -114,22 +115,10 @@ public class RoutingNode implements Iterable<ShardRouting> {
@Override
public Iterator<ShardRouting> iterator() {
final Iterator<ShardRouting> primaryIterator = Collections.unmodifiableCollection(this.shardTuple.v1().values()).iterator();
final Iterator<ShardRouting> replicaIterator = Collections.unmodifiableCollection(this.shardTuple.v2().values()).iterator();
return new Iterator<ShardRouting>() {
@Override
public boolean hasNext() {
return primaryIterator.hasNext() || replicaIterator.hasNext();
}
@Override
public ShardRouting next() {
if (primaryIterator.hasNext()) {
return primaryIterator.next();
}
return replicaIterator.next();
}
};
return Stream.concat(
Collections.unmodifiableCollection(this.shardTuple.v1().values()).stream(),
Collections.unmodifiableCollection(this.shardTuple.v2().values()).stream()
).iterator();
}
}
@ -217,7 +206,7 @@ public class RoutingNode implements Iterable<ShardRouting> {
*/
void add(ShardRouting shard) {
assert invariant();
if (shards.add(shard) != null) {
if (shards.put(shard) != null) {
throw new IllegalStateException(
"Trying to add a shard "
+ shard.shardId()

View File

@ -12,12 +12,14 @@ import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Stream;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
@ -84,21 +86,27 @@ public class MovePrimaryFirstTests extends OpenSearchIntegTestCase {
final ClusterStateListener listener = event -> {
if (event.routingTableChanged()) {
final RoutingNodes routingNodes = event.state().getRoutingNodes();
int startedz2n1 = 0;
int startedz2n2 = 0;
int startedCount = 0;
List<ShardRouting> initz2n1 = new ArrayList<>(), initz2n2 = new ArrayList<>();
for (Iterator<RoutingNode> it = routingNodes.iterator(); it.hasNext();) {
RoutingNode routingNode = it.next();
final String nodeName = routingNode.node().getName();
if (nodeName.equals(z2n1)) {
startedz2n1 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n1 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
} else if (nodeName.equals(z2n2)) {
startedz2n2 = routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
startedCount += routingNode.numberOfShardsWithState(ShardRoutingState.STARTED);
initz2n2 = routingNode.shardsWithState(ShardRoutingState.INITIALIZING);
}
}
if (startedz2n1 >= primaryShardCount / 2 && startedz2n2 >= primaryShardCount / 2) {
if (!Stream.concat(initz2n1.stream(), initz2n2.stream()).anyMatch(s -> s.primary())) {
// All primaries are relocated before 60% of total shards are started on new nodes
final int totalShardCount = primaryShardCount * 2;
if (primaryShardCount <= startedCount && startedCount <= 3 * totalShardCount / 5) {
primaryMoveLatch.countDown();
}
}
}
};
internalCluster().clusterService().addListener(listener);
@ -113,6 +121,6 @@ public class MovePrimaryFirstTests extends OpenSearchIntegTestCase {
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n1));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(z1n2));
} catch (Exception e) {}
ensureGreen(TimeValue.timeValueSeconds(60));
ensureGreen();
}
}