From 557fcf915e987ac2afda8b7b79abbe00de2a2b25 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Mon, 28 Jan 2019 15:25:56 -0500 Subject: [PATCH] Wait for mapping in testReadRequestsReturnLatestMappingVersion (#37886) If the index request is executed before the mapping update is applied on the IndexShard, the index request will perform a dynamic mapping update. This mapping update will be timeout (i.e, ProcessClusterEventTimeoutException) because the latch is not open. This leads to the failure of the index request and the test. This commit makes sure the mapping is ready before we execute the index request. Closes #37807 --- .../xpack/ccr/FollowerFailOverIT.java | 49 ++++++++++++------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java index b49e7c9ced7..73cc94b4703 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/FollowerFailOverIT.java @@ -22,6 +22,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -230,8 +231,7 @@ public class FollowerFailOverIT extends CcrIntegTestCase { pauseFollow("follower-index"); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37807") - public void testReadRequestsReturnsLatestMappingVersion() throws Exception { + public void testReadRequestsReturnLatestMappingVersion() throws Exception { InternalTestCluster leaderCluster = getLeaderCluster(); Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build(); String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes); @@ -244,6 +244,9 @@ public class FollowerFailOverIT extends CcrIntegTestCase { .put("index.routing.allocation.require.box", "large")) .get() ); + getFollowerCluster().startDataOnlyNode(nodeAttributes); + followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get(); + ensureFollowerGreen("follower-index"); ClusterService clusterService = leaderCluster.clusterService(dataNode); ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId(); IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode); @@ -265,22 +268,30 @@ public class FollowerFailOverIT extends CcrIntegTestCase { }); leaderCluster.client().admin().indices().preparePutMapping().setType("doc") .setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get(); - IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1") - .setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get(); - assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED)); - assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L)); - getFollowerCluster().startDataOnlyNode(nodeAttributes); - followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get(); - ensureFollowerGreen("follower-index"); - // Make sure at least one read-request which requires mapping sync is completed. - assertBusy(() -> { - CcrClient ccrClient = new CcrClient(followerClient()); - FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet(); - long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum(); - assertThat(bytesRead, Matchers.greaterThan(0L)); - }, 60, TimeUnit.SECONDS); - latch.countDown(); - assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); - pauseFollow("follower-index"); + try { + // Make sure the mapping is ready on the shard before we execute the index request; otherwise the index request + // will perform a dynamic mapping update which however will be blocked because the latch is remained closed. + assertBusy(() -> { + DocumentMapper mapper = indexShard.mapperService().documentMapper("doc"); + assertNotNull(mapper); + assertNotNull(mapper.mappers().getMapper("balance")); + }); + IndexResponse indexResp = leaderCluster.client().prepareIndex("leader-index", "doc", "1") + .setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get(); + assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED)); + assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L)); + // Make sure at least one read-request which requires mapping sync is completed. + assertBusy(() -> { + CcrClient ccrClient = new CcrClient(followerClient()); + FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet(); + long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum(); + assertThat(bytesRead, Matchers.greaterThan(0L)); + }, 60, TimeUnit.SECONDS); + latch.countDown(); + assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); + } finally { + latch.countDown(); // no effect if latch was counted down - this makes sure teardown can make progress. + pauseFollow("follower-index"); + } } }