From d9fa4e4adaf354d802523e5cf396cf2bfc7f40b4 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 17 Jan 2019 13:59:09 +0100 Subject: [PATCH] Fix testRelocateWhileContinuouslyIndexingAndWaitingForRefresh (#37560) This test failed because the refresh at the end of the test is not guaranteed to run before the indexing is completed, and therefore there's no guarantee that the refresh will free all operations. This triggers an assertion failure in the test clean-up, which asserts that there are no more pending operations. --- .../elasticsearch/recovery/RelocationIT.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java index 45f0fce3b81..fb455f37d76 100644 --- a/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java +++ b/server/src/test/java/org/elasticsearch/recovery/RelocationIT.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; @@ -552,7 +553,7 @@ public class RelocationIT extends ESIntegTestCase { assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L)); } - public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() { + public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception { logger.info("--> starting [node1] ..."); final String node1 = internalCluster().startNode(); @@ -570,9 +571,11 @@ public class RelocationIT extends ESIntegTestCase { logger.info("--> flush so we have an actual index"); client().admin().indices().prepareFlush().execute().actionGet(); logger.info("--> index more docs so we have something in the translog"); + final List> pendingIndexResponses = new ArrayList<>(); for (int i = 10; i < 20; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } logger.info("--> start another node"); @@ -587,8 +590,9 @@ public class RelocationIT extends ESIntegTestCase { .execute(); logger.info("--> index 100 docs while relocating"); for (int i = 20; i < 120; i++) { - client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) - .setSource("field", "value" + i).execute(); + pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i)) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .setSource("field", "value" + i).execute()); } relocationListener.actionGet(); clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID) @@ -596,7 +600,11 @@ public class RelocationIT extends ESIntegTestCase { assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); logger.info("--> verifying count"); - client().admin().indices().prepareRefresh().execute().actionGet(); + assertBusy(() -> { + client().admin().indices().prepareRefresh().execute().actionGet(); + assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone)); + }, 1, TimeUnit.MINUTES); + assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L)); }