diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java index 555bf243354..99c50a839ab 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseWhileRelocatingShardsIT.java @@ -119,6 +119,8 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { final String targetNode = internalCluster().startDataOnlyNode(); ensureClusterSizeConsistency(); // wait for the master to finish processing join. + final MockTransportService targetTransportService = + (MockTransportService) internalCluster().getInstance(TransportService.class, targetNode); final Set acknowledgedCloses = ConcurrentCollections.newConcurrentSet(); try { @@ -146,8 +148,7 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { } final DiscoveryNode sourceNode = clusterService.state().nodes().resolveNode(primary.currentNodeId()); - ((MockTransportService) internalCluster().getInstance(TransportService.class, targetNode)) - .addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()), + targetTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, sourceNode.getName()), (connection, requestId, action, request, options) -> { if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action)) { logger.debug("blocking recovery of shard {}", ((StartRecoveryRequest) request).shardId()); @@ -210,28 +211,30 @@ public class CloseWhileRelocatingShardsIT extends ESIntegTestCase { } } } + + for (String index : indices) { + if (acknowledgedCloses.contains(index)) { + assertIndexIsClosed(index); + } else { + assertIndexIsOpened(index); + } + } + + targetTransportService.clearAllRules(); + + assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0)); + assertAcked(client().admin().indices().prepareOpen("index-*")); + ensureGreen(indices); + + for (String index : acknowledgedCloses) { + long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value; + assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount + + " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount); + } } finally { assertAcked(client().admin().cluster().prepareUpdateSettings() .setTransientSettings(Settings.builder() .putNull(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey()))); } - - for (String index : indices) { - if (acknowledgedCloses.contains(index)) { - assertIndexIsClosed(index); - } else { - assertIndexIsOpened(index); - } - } - - assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0)); - assertAcked(client().admin().indices().prepareOpen("index-*")); - ensureGreen(indices); - - for (String index : acknowledgedCloses) { - long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value; - assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount - + " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount); - } } }