Do not intercept renew requests from other tests (#48833)

We might have some outstanding renew retention lease requests after a 
shard has unfollowed. If testRetentionLeaseIsAddedIfItDisappearsWhileFollowing
intercepts a renew request from other tests then we will never unlatch 
and the test will time out.

Closes #45192
This commit is contained in:
Nhat Nguyen 2019-11-02 18:44:38 -04:00
parent 0887cbc964
commit 020ff0fef9
1 changed files with 33 additions and 35 deletions

View File

@ -43,7 +43,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.RestoreInfo; import org.elasticsearch.snapshots.RestoreInfo;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.test.junit.annotations.TestIssueLogging;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RemoteTransportException;
@ -87,9 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
@TestIssueLogging(
value = "org.elasticsearch.xpack.ccr:trace,org.elasticsearch.indices.recovery:trace,org.elasticsearch.index.seqno:debug",
issueUrl = "https://github.com/elastic/elasticsearch/issues/45192")
public class CcrRetentionLeaseIT extends CcrIntegTestCase { public class CcrRetentionLeaseIT extends CcrIntegTestCase {
public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin { public static final class RetentionLeaseRenewIntervalSettingPlugin extends Plugin {
@ -781,40 +777,42 @@ public class CcrRetentionLeaseIT extends CcrIntegTestCase {
(connection, requestId, action, request, options) -> { (connection, requestId, action, request, options) -> {
if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action) if (RetentionLeaseActions.Renew.ACTION_NAME.equals(action)
|| TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) { || TransportActionProxy.getProxyAction(RetentionLeaseActions.Renew.ACTION_NAME).equals(action)) {
senderTransportService.clearAllRules();
final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request; final RetentionLeaseActions.RenewRequest renewRequest = (RetentionLeaseActions.RenewRequest) request;
final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex); final String retentionLeaseId = getRetentionLeaseId(followerIndex, leaderIndex);
assertThat(retentionLeaseId, equalTo(renewRequest.getId())); if (retentionLeaseId.equals(renewRequest.getId())) {
logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId); logger.info("--> intercepting renewal request for retention lease [{}]", retentionLeaseId);
final String primaryShardNodeId = senderTransportService.clearAllRules();
getLeaderCluster() final String primaryShardNodeId =
.clusterService() getLeaderCluster()
.state() .clusterService()
.routingTable() .state()
.index(leaderIndex) .routingTable()
.shard(renewRequest.getShardId().id()) .index(leaderIndex)
.primaryShard() .shard(renewRequest.getShardId().id())
.currentNodeId(); .primaryShard()
final String primaryShardNodeName = .currentNodeId();
getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName(); final String primaryShardNodeName =
final IndexShard primary = getLeaderCluster().clusterService().state().nodes().get(primaryShardNodeId).getName();
getLeaderCluster() final IndexShard primary =
.getInstance(IndicesService.class, primaryShardNodeName) getLeaderCluster()
.getShardOrNull(renewRequest.getShardId()); .getInstance(IndicesService.class, primaryShardNodeName)
final CountDownLatch innerLatch = new CountDownLatch(1); .getShardOrNull(renewRequest.getShardId());
// this forces the background renewal from following to face a retention lease not found exception final CountDownLatch innerLatch = new CountDownLatch(1);
logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId); try {
primary.removeRetentionLease(retentionLeaseId, // this forces the background renewal from following to face a retention lease not found exception
ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString()))); logger.info("--> removing retention lease [{}] on the leader", retentionLeaseId);
logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader", retentionLeaseId); primary.removeRetentionLease(retentionLeaseId,
try { ActionListener.wrap(r -> innerLatch.countDown(), e -> fail(e.toString())));
innerLatch.await(); logger.info("--> waiting for the removed retention lease [{}] to be synced on the leader",
} catch (final InterruptedException e) { retentionLeaseId);
Thread.currentThread().interrupt(); innerLatch.await();
fail(e.toString()); logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId);
} catch (final Exception e) {
throw new AssertionError("failed to remove retention lease [" + retentionLeaseId + "] on the leader");
} finally {
latch.countDown();
}
} }
logger.info("--> removed retention lease [{}] on the leader", retentionLeaseId);
latch.countDown();
} }
connection.sendRequest(requestId, action, request, options); connection.sendRequest(requestId, action, request, options);
}); });