Wait for mapping in testReadRequestsReturnLatestMappingVersion (#37886)

If the index request is executed before the mapping update is applied on
the IndexShard, the index request will perform a dynamic mapping update.
This mapping update will be timeout (i.e, ProcessClusterEventTimeoutException)
because the latch is not open. This leads to the failure of the index
request and the test. This commit makes sure the mapping is ready
before we execute the index request.

Closes #37807
This commit is contained in:
Nhat Nguyen 2019-01-28 15:25:56 -05:00 committed by GitHub
parent 6325e55dbf
commit 557fcf915e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 30 additions and 19 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
@ -230,8 +231,7 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
pauseFollow("follower-index"); pauseFollow("follower-index");
} }
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37807") public void testReadRequestsReturnLatestMappingVersion() throws Exception {
public void testReadRequestsReturnsLatestMappingVersion() throws Exception {
InternalTestCluster leaderCluster = getLeaderCluster(); InternalTestCluster leaderCluster = getLeaderCluster();
Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build(); Settings nodeAttributes = Settings.builder().put("node.attr.box", "large").build();
String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes); String dataNode = leaderCluster.startDataOnlyNode(nodeAttributes);
@ -244,6 +244,9 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
.put("index.routing.allocation.require.box", "large")) .put("index.routing.allocation.require.box", "large"))
.get() .get()
); );
getFollowerCluster().startDataOnlyNode(nodeAttributes);
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get();
ensureFollowerGreen("follower-index");
ClusterService clusterService = leaderCluster.clusterService(dataNode); ClusterService clusterService = leaderCluster.clusterService(dataNode);
ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId(); ShardId shardId = clusterService.state().routingTable().index("leader-index").shard(0).shardId();
IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode); IndicesService indicesService = leaderCluster.getInstance(IndicesService.class, dataNode);
@ -265,22 +268,30 @@ public class FollowerFailOverIT extends CcrIntegTestCase {
}); });
leaderCluster.client().admin().indices().preparePutMapping().setType("doc") leaderCluster.client().admin().indices().preparePutMapping().setType("doc")
.setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get(); .setSource("balance", "type=long").setTimeout(TimeValue.ZERO).get();
IndexResponse indexResp = leaderCluster.client(dataNode).prepareIndex("leader-index", "doc", "1") try {
.setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get(); // Make sure the mapping is ready on the shard before we execute the index request; otherwise the index request
assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED)); // will perform a dynamic mapping update which however will be blocked because the latch is remained closed.
assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L)); assertBusy(() -> {
getFollowerCluster().startDataOnlyNode(nodeAttributes); DocumentMapper mapper = indexShard.mapperService().documentMapper("doc");
followerClient().execute(PutFollowAction.INSTANCE, putFollow("leader-index", "follower-index")).get(); assertNotNull(mapper);
ensureFollowerGreen("follower-index"); assertNotNull(mapper.mappers().getMapper("balance"));
// Make sure at least one read-request which requires mapping sync is completed. });
assertBusy(() -> { IndexResponse indexResp = leaderCluster.client().prepareIndex("leader-index", "doc", "1")
CcrClient ccrClient = new CcrClient(followerClient()); .setSource("{\"balance\": 100}", XContentType.JSON).setTimeout(TimeValue.ZERO).get();
FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet(); assertThat(indexResp.getResult(), equalTo(DocWriteResponse.Result.CREATED));
long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum(); assertThat(indexShard.getGlobalCheckpoint(), equalTo(0L));
assertThat(bytesRead, Matchers.greaterThan(0L)); // Make sure at least one read-request which requires mapping sync is completed.
}, 60, TimeUnit.SECONDS); assertBusy(() -> {
latch.countDown(); CcrClient ccrClient = new CcrClient(followerClient());
assertIndexFullyReplicatedToFollower("leader-index", "follower-index"); FollowStatsAction.StatsResponses responses = ccrClient.followStats(new FollowStatsAction.StatsRequest()).actionGet();
pauseFollow("follower-index"); long bytesRead = responses.getStatsResponses().stream().mapToLong(r -> r.status().bytesRead()).sum();
assertThat(bytesRead, Matchers.greaterThan(0L));
}, 60, TimeUnit.SECONDS);
latch.countDown();
assertIndexFullyReplicatedToFollower("leader-index", "follower-index");
} finally {
latch.countDown(); // no effect if latch was counted down - this makes sure teardown can make progress.
pauseFollow("follower-index");
}
} }
} }