diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index be9f127b1ad..6a43b3148e4 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -209,23 +209,17 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { final MockTransportService senderTransportService = (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); - final ClusterStateResponse leaderClusterState = leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getNodes().values()) { - final MockTransportService receiverTransportService = - (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); - senderTransportService.addSendBehavior(receiverTransportService, - (connection, requestId, action, request, options) -> { - if (ClearCcrRestoreSessionAction.NAME.equals(action)) { - try { - latch.await(); - } catch (final InterruptedException e) { - fail(e.toString()); - } + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (ClearCcrRestoreSessionAction.NAME.equals(action)) { + try { + latch.await(); + } catch (final InterruptedException e) { + fail(e.toString()); } - connection.sendRequest(requestId, action, request, options); - }); - } - + } + connection.sendRequest(requestId, action, request, options); + }); } final PlainActionFuture future = PlainActionFuture.newFuture(); @@ -419,50 +413,45 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { final MockTransportService senderTransportService = (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); - final ClusterStateResponse leaderClusterState = - leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getNodes().values()) { - final MockTransportService receiverTransportService = - (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); - senderTransportService.addSendBehavior(receiverTransportService, - (connection, requestId, action, request, options) -> { - if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { - final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; - if (shardIds.contains(removeRequest.getShardId().id())) { - final String primaryShardNodeId = - getLeaderCluster() - .clusterService() - .state() - .routingTable() - .index(leaderIndex) - .shard(removeRequest.getShardId().id()) - .primaryShard() - .currentNodeId(); - final String primaryShardNodeName = - getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); - final IndexShard primary = - getLeaderCluster() - .getInstance(IndicesService.class, primaryShardNodeName) - .getShardOrNull(removeRequest.getShardId()); - final CountDownLatch latch = new CountDownLatch(1); - primary.removeRetentionLease( - retentionLeaseId, - ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); - try { - latch.await(); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - fail(e.toString()); - } + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + final String primaryShardNodeId = + getLeaderCluster() + .clusterService() + .state() + .routingTable() + .index(leaderIndex) + .shard(removeRequest.getShardId().id()) + .primaryShard() + .currentNodeId(); + final String primaryShardNodeName = + getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = + getLeaderCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(removeRequest.getShardId()); + final CountDownLatch latch = new CountDownLatch(1); + primary.removeRetentionLease( + retentionLeaseId, + ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()))); + try { + latch.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + fail(e.toString()); } } - connection.sendRequest(requestId, action, request, options); - }); - } - + } + connection.sendRequest(requestId, action, request, options); + }); } + + pauseFollow(followerIndex); followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); @@ -508,25 +497,18 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase { for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { final MockTransportService senderTransportService = (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); - final ClusterStateResponse leaderClusterState = - leaderClient().admin().cluster().prepareState().clear().setNodes(true).get(); - for (final ObjectCursor receiverNode : leaderClusterState.getState().nodes().getNodes().values()) { - final MockTransportService receiverTransportService = - (MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName()); - senderTransportService.addSendBehavior(receiverTransportService, - (connection, requestId, action, request, options) -> { - if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { - final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; - if (shardIds.contains(removeRequest.getShardId().id())) { - throw randomBoolean() - ? new ConnectTransportException(receiverNode.value, "connection failed") - : new IndexShardClosedException(removeRequest.getShardId()); - } + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (shardIds.contains(removeRequest.getShardId().id())) { + throw randomBoolean() + ? new ConnectTransportException(connection.getNode(), "connection failed") + : new IndexShardClosedException(removeRequest.getShardId()); } - connection.sendRequest(requestId, action, request, options); - }); - } - + } + connection.sendRequest(requestId, action, request, options); + }); } final ElasticsearchException e = expectThrows(