Simplify mocking in CCR retention lease tests
This commit simplifies the use of transport mocking in the CCR retention lease integration tests. Instead of adding a send rule between nodes, we add a default send rule. This greatly simplifies the code here, and speeds the test up a little bit too.
This commit is contained in:
parent
c7516b03b6
commit
feb25c71a0
|
@ -209,23 +209,17 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
|
|||
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService senderTransportService =
|
||||
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
|
||||
final ClusterStateResponse leaderClusterState = leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService receiverTransportService =
|
||||
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
|
||||
senderTransportService.addSendBehavior(receiverTransportService,
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (ClearCcrRestoreSessionAction.NAME.equals(action)) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
senderTransportService.addSendBehavior(
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (ClearCcrRestoreSessionAction.NAME.equals(action)) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
fail(e.toString());
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
final PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
|
||||
|
@ -419,50 +413,45 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
|
|||
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService senderTransportService =
|
||||
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
|
||||
final ClusterStateResponse leaderClusterState =
|
||||
leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService receiverTransportService =
|
||||
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
|
||||
senderTransportService.addSendBehavior(receiverTransportService,
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
|
||||
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
|
||||
if (shardIds.contains(removeRequest.getShardId().id())) {
|
||||
final String primaryShardNodeId =
|
||||
getLeaderCluster()
|
||||
.clusterService()
|
||||
.state()
|
||||
.routingTable()
|
||||
.index(leaderIndex)
|
||||
.shard(removeRequest.getShardId().id())
|
||||
.primaryShard()
|
||||
.currentNodeId();
|
||||
final String primaryShardNodeName =
|
||||
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
|
||||
final IndexShard primary =
|
||||
getLeaderCluster()
|
||||
.getInstance(IndicesService.class, primaryShardNodeName)
|
||||
.getShardOrNull(removeRequest.getShardId());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
primary.removeRetentionLease(
|
||||
retentionLeaseId,
|
||||
ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
fail(e.toString());
|
||||
}
|
||||
senderTransportService.addSendBehavior(
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
|
||||
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
|
||||
if (shardIds.contains(removeRequest.getShardId().id())) {
|
||||
final String primaryShardNodeId =
|
||||
getLeaderCluster()
|
||||
.clusterService()
|
||||
.state()
|
||||
.routingTable()
|
||||
.index(leaderIndex)
|
||||
.shard(removeRequest.getShardId().id())
|
||||
.primaryShard()
|
||||
.currentNodeId();
|
||||
final String primaryShardNodeName =
|
||||
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
|
||||
final IndexShard primary =
|
||||
getLeaderCluster()
|
||||
.getInstance(IndicesService.class, primaryShardNodeName)
|
||||
.getShardOrNull(removeRequest.getShardId());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
primary.removeRetentionLease(
|
||||
retentionLeaseId,
|
||||
ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
fail(e.toString());
|
||||
}
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
pauseFollow(followerIndex);
|
||||
followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet();
|
||||
assertAcked(followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet());
|
||||
|
@ -508,25 +497,18 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
|
|||
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService senderTransportService =
|
||||
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName());
|
||||
final ClusterStateResponse leaderClusterState =
|
||||
leaderClient().admin().cluster().prepareState().clear().setNodes(true).get();
|
||||
for (final ObjectCursor<DiscoveryNode> receiverNode : leaderClusterState.getState().nodes().getNodes().values()) {
|
||||
final MockTransportService receiverTransportService =
|
||||
(MockTransportService) getLeaderCluster().getInstance(TransportService.class, receiverNode.value.getName());
|
||||
senderTransportService.addSendBehavior(receiverTransportService,
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
|
||||
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
|
||||
if (shardIds.contains(removeRequest.getShardId().id())) {
|
||||
throw randomBoolean()
|
||||
? new ConnectTransportException(receiverNode.value, "connection failed")
|
||||
: new IndexShardClosedException(removeRequest.getShardId());
|
||||
}
|
||||
senderTransportService.addSendBehavior(
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)) {
|
||||
final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request;
|
||||
if (shardIds.contains(removeRequest.getShardId().id())) {
|
||||
throw randomBoolean()
|
||||
? new ConnectTransportException(connection.getNode(), "connection failed")
|
||||
: new IndexShardClosedException(removeRequest.getShardId());
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
|
||||
final ElasticsearchException e = expectThrows(
|
||||
|
|
Loading…
Reference in New Issue