Fix testRelocateWhileContinuouslyIndexingAndWaitingForRefresh (#37560)
This test failed because the refresh at the end of the test is not guaranteed to run before the indexing is completed, and therefore there's no guarantee that the refresh will free all operations. This triggers an assertion failure in the test clean-up, which asserts that there are no more pending operations.
This commit is contained in:
parent
99b09845da
commit
d9fa4e4ada
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionFuture;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
|
@ -552,7 +553,7 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(20L));
|
||||
}
|
||||
|
||||
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() {
|
||||
public void testRelocateWhileContinuouslyIndexingAndWaitingForRefresh() throws Exception {
|
||||
logger.info("--> starting [node1] ...");
|
||||
final String node1 = internalCluster().startNode();
|
||||
|
||||
|
@ -570,9 +571,11 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
logger.info("--> flush so we have an actual index");
|
||||
client().admin().indices().prepareFlush().execute().actionGet();
|
||||
logger.info("--> index more docs so we have something in the translog");
|
||||
final List<ActionFuture<IndexResponse>> pendingIndexResponses = new ArrayList<>();
|
||||
for (int i = 10; i < 20; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute();
|
||||
pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute());
|
||||
}
|
||||
|
||||
logger.info("--> start another node");
|
||||
|
@ -587,8 +590,9 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
.execute();
|
||||
logger.info("--> index 100 docs while relocating");
|
||||
for (int i = 20; i < 120; i++) {
|
||||
client().prepareIndex("test", "type", Integer.toString(i)).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute();
|
||||
pendingIndexResponses.add(client().prepareIndex("test", "type", Integer.toString(i))
|
||||
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
|
||||
.setSource("field", "value" + i).execute());
|
||||
}
|
||||
relocationListener.actionGet();
|
||||
clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID)
|
||||
|
@ -596,7 +600,11 @@ public class RelocationIT extends ESIntegTestCase {
|
|||
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
|
||||
|
||||
logger.info("--> verifying count");
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
assertBusy(() -> {
|
||||
client().admin().indices().prepareRefresh().execute().actionGet();
|
||||
assertTrue(pendingIndexResponses.stream().allMatch(ActionFuture::isDone));
|
||||
}, 1, TimeUnit.MINUTES);
|
||||
|
||||
assertThat(client().prepareSearch("test").setSize(0).execute().actionGet().getHits().getTotalHits().value, equalTo(120L));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue