Clear send behavior rule in CloseWhileRelocatingShardsIT (#38159)
The current CloseWhileRelocatingShardsIT test adds some "send behavior" rule to a target node's mocked transport service in order to detect when shard relocating are started. These rules are never cleared and prevent the test to complete normally after the rebalance is re-enabled again. This commit changes the test so that rules are cleared and most verifications are done before the rebalance is reenabled again. Closes #38090
This commit is contained in:
parent
ce469cfda5
commit
029e4b6278
|
@ -119,6 +119,8 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
||||||
|
|
||||||
final String targetNode = internalCluster().startDataOnlyNode();
|
final String targetNode = internalCluster().startDataOnlyNode();
|
||||||
ensureClusterSizeConsistency(); // wait for the master to finish processing join.
|
ensureClusterSizeConsistency(); // wait for the master to finish processing join.
|
||||||
|
final MockTransportService targetTransportService =
|
||||||
|
(MockTransportService) internalCluster().getInstance(TransportService.class, targetNode);
|
||||||
|
|
||||||
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
|
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
|
||||||
try {
|
try {
|
||||||
|
@ -146,8 +148,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
|
final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId());
|
||||||
((MockTransportService) internalCluster().getInstance(TransportService.class, targetNode))
|
targetTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
|
||||||
.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()),
|
|
||||||
(connection, requestId, action, request, options) -> {
|
(connection, requestId, action, request, options) -> {
|
||||||
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
|
if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) {
|
||||||
logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
|
logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId());
|
||||||
|
@ -210,28 +211,30 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (String index : indices) {
|
||||||
|
if (acknowledgedCloses.contains(index)) {
|
||||||
|
assertIndexIsClosed(index);
|
||||||
|
} else {
|
||||||
|
assertIndexIsOpened(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
targetTransportService.clearAllRules();
|
||||||
|
|
||||||
|
assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
|
||||||
|
assertAcked(client().admin().indices().prepareOpen("index-*"));
|
||||||
|
ensureGreen(indices);
|
||||||
|
|
||||||
|
for (String index : acknowledgedCloses) {
|
||||||
|
long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
|
||||||
|
assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
|
||||||
|
+ " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
assertAcked(client().admin().cluster().prepareUpdateSettings()
|
||||||
.setTransientSettings(Settings.builder()
|
.setTransientSettings(Settings.builder()
|
||||||
.putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
|
.putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (String index : indices) {
|
|
||||||
if (acknowledgedCloses.contains(index)) {
|
|
||||||
assertIndexIsClosed(index);
|
|
||||||
} else {
|
|
||||||
assertIndexIsOpened(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
|
|
||||||
assertAcked(client().admin().indices().prepareOpen("index-*"));
|
|
||||||
ensureGreen(indices);
|
|
||||||
|
|
||||||
for (String index : acknowledgedCloses) {
|
|
||||||
long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
|
|
||||||
assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
|
|
||||||
+ " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue