Reset mock transport service in CcrRetentionLeaseIT (#42600)

testRetentionLeaseIsAddedIfItDisappearsWhileFollowing does not reset the
mock transport service after test. Surviving transport interceptors from
that test can sneaky remove retention leases and make other tests fail.

Closes #39331
Closes #39509
Closes #41428
Closes #41679
Closes #41737
Closes #41756
This commit is contained in:
Nhat Nguyen 2019-05-27 21:44:36 -04:00
parent ab832c4f17
commit 2077f9ffbc

View File

@ -6,7 +6,6 @@
package org.elasticsearch.xpack.ccr; package org.elasticsearch.xpack.ccr;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
@ -44,7 +43,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RemoteTransportException;
@ -88,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
@TestLogging(value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug")
public class CcrRetentionLeaseIT extends CcrIntegTestCase { public class CcrRetentionLeaseIT extends CcrIntegTestCase {
public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin { public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin {
@ -224,9 +221,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
// block the recovery from completing; this ensures the background sync is still running // block the recovery from completing; this ensures the background sync is still running
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior( senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (ClearCcrRestoreSessionAction.NAME.equals(action) if (ClearCcrRestoreSessionAction.NAME.equals(action)
@ -248,9 +245,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex); assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
latch.countDown(); latch.countDown();
} finally { } finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules(); senderTransportService.clearAllRules();
} }
} }
@ -405,9 +402,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try { try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior( senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)
@ -456,9 +453,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
} }
} finally { } finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules(); senderTransportService.clearAllRules();
} }
} }
@ -488,9 +485,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try { try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior( senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action)
@ -526,9 +523,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
getLeaderCluster().getClusterName(), getLeaderCluster().getClusterName(),
new Index(leaderIndex, leaderUUID)))); new Index(leaderIndex, leaderUUID))));
} finally { } finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules(); senderTransportService.clearAllRules();
} }
} }
@ -766,35 +763,36 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) { try {
final MockTransportService senderTransportService = for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); final MockTransportService senderTransportService =
senderTransportService.addSendBehavior( (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
senderTransportService.clearAllRules(); senderTransportService.clearAllRules();
final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request;
final String primaryShardNodeId = final String primaryShardNodeId =
getLeaderCluster() getLeaderCluster()
.clusterService() .clusterService()
.state() .state()
.routingTable() .routingTable()
.index(leaderIndex) .index(leaderIndex)
.shard(renewRequest.getShardId().id()) .shard(renewRequest.getShardId().id())
.primaryShard() .primaryShard()
.currentNodeId(); .currentNodeId();
final String primaryShardNodeName = final String primaryShardNodeName =
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = final IndexShard primary =
getLeaderCluster() getLeaderCluster()
.getInstance(IndicesService.class, primaryShardNodeName) .getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(renewRequest.getShardId()); .getShardOrNull(renewRequest.getShardId());
final CountDownLatch innerLatch = new CountDownLatch(1); final CountDownLatch innerLatch = new CountDownLatch(1);
// this forces the background renewal from following to face a retention lease not found exception // this forces the background renewal from following to face a retention lease not found exception
primary.removeRetentionLease( primary.removeRetentionLease(
getRetentionLeaseId(followerIndex, leaderIndex), getRetentionLeaseId(followerIndex, leaderIndex),
ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));
try { try {
innerLatch.await(); innerLatch.await();
@ -807,11 +805,18 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
} }
connection.sendRequest(requestId, action, request, options); connection.sendRequest(requestId, action, request, options);
}); });
}
latch.await();
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
} finally {
for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules();
}
} }
latch.await();
assertRetentionLeaseRenewal(numberOfShards, numberOfReplicas, followerIndex, leaderIndex);
} }
/** /**
@ -858,9 +863,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get();
try { try {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.addSendBehavior( senderTransportService.addSendBehavior(
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
@ -914,9 +919,9 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); assertThat(Strings.toString(shardStats), shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty());
} }
} finally { } finally {
for (final ObjectCursor<DiscoveryNode> senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { for (final DiscoveryNode senderNode : followerClusterState.getState().nodes()) {
final MockTransportService senderTransportService = final MockTransportService senderTransportService =
(MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.getName());
senderTransportService.clearAllRules(); senderTransportService.clearAllRules();
} }
} }