diff --git a/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java b/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java index f7485374359..9a07acaaa90 100644 --- a/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java +++ b/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java @@ -20,10 +20,8 @@ package org.elasticsearch.indices.mapping; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; -import com.google.common.base.Predicate; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; @@ -99,15 +97,7 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest { String fieldName = "field_" + type + "_" + rec; fieldName = "\"" + fieldName + "\""; // quote it, so we make sure we catch the exact one if (!typeToSource.containsKey(type) || !typeToSource.get(type).contains(fieldName)) { - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); - awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); - return pendingTasks.pendingTasks().isEmpty(); - } - }); - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet(); + waitNoPendingTasksOnMaster(); // its going to break, before we do, make sure that the cluster state hasn't changed on us... ClusterState state2 = client().admin().cluster().prepareState().get().getState(); if (state.version() != state2.version()) { diff --git a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java index d3f22d80d6e..bf82cdd9dd8 100644 --- a/src/test/java/org/elasticsearch/percolator/PercolatorTests.java +++ b/src/test/java/org/elasticsearch/percolator/PercolatorTests.java @@ -1767,21 +1767,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest { assertThat(response.getMatches(), arrayWithSize(0)); // wait until the mapping change has propagated - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); - boolean applied = awaitBusy(new Predicate() { - @Override - public boolean apply(Object input) { - for (Client client : clients()) { - PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().get(); - if (!pendingTasks.pendingTasks().isEmpty()) { - return false; - } - } - return true; - } - }); - assertThat(applied, is(true)); - client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + waitNoPendingTasksOnAll(); GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get(); assertThat(mappingsResponse.getMappings().get("test"), notNullValue()); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 78241edeb14..bf6ca5599b2 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse; import org.elasticsearch.action.admin.indices.flush.FlushResponse; @@ -102,8 +103,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.test.InternalTestCluster.clusterName; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; /** * {@link ElasticsearchIntegrationTest} is an abstract base class to run integration @@ -702,6 +702,56 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase return builder; } + /** + * Waits until all nodes have no pending tasks. + */ + public void waitNoPendingTasksOnAll() throws InterruptedException { + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1]; + boolean applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + reference[0] = null; + for (Client client : clients()) { + PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get(); + if (!pendingTasks.pendingTasks().isEmpty()) { + reference[0] = pendingTasks; + return false; + } + } + return true; + } + }); + if (!applied) { + fail(reference[0].prettyPrint()); + } + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + } + + /** + * Waits until the elected master node has no pending tasks. + */ + public void waitNoPendingTasksOnMaster() throws InterruptedException { + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1]; + boolean applied = awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + reference[0] = null; + PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); + if (!pendingTasks.pendingTasks().isEmpty()) { + reference[0] = pendingTasks; + return false; + } + return true; + } + }); + if (!applied) { + fail(reference[0].prettyPrint()); + } + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get(); + } + /** * Restricts the given index to be allocated on n nodes using the allocation deciders. * Yet if the shards can't be allocated on any other node shards for this index will remain allocated on