[TEST] Move the waiting for pending tasks to helper methods and let the percolator and update mapping test use these helper methods.

This commit is contained in:
Martijn van Groningen 2014-06-20 23:44:33 +02:00
parent 11251bca92
commit 812972ab0e
3 changed files with 54 additions and 28 deletions

View File

@ -20,10 +20,8 @@
package org.elasticsearch.indices.mapping;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
@ -99,15 +97,7 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
String fieldName = "field_" + type + "_" + rec;
fieldName = "\"" + fieldName + "\""; // quote it, so we make sure we catch the exact one
if (!typeToSource.containsKey(type) || !typeToSource.get(type).contains(fieldName)) {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).execute().actionGet();
waitNoPendingTasksOnMaster();
// its going to break, before we do, make sure that the cluster state hasn't changed on us...
ClusterState state2 = client().admin().cluster().prepareState().get().getState();
if (state.version() != state2.version()) {

View File

@ -1767,21 +1767,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
assertThat(response.getMatches(), arrayWithSize(0));
// wait until the mapping change has propagated
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
for (Client client : clients()) {
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().get();
if (!pendingTasks.pendingTasks().isEmpty()) {
return false;
}
}
return true;
}
});
assertThat(applied, is(true));
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
waitNoPendingTasksOnAll();
GetMappingsResponse mappingsResponse = client().admin().indices().prepareGetMappings("test").get();
assertThat(mappingsResponse.getMappings().get("test"), notNullValue());

View File

@ -35,6 +35,7 @@ import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
@ -102,8 +103,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.InternalTestCluster.clusterName;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.emptyIterable;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.*;
/**
* {@link ElasticsearchIntegrationTest} is an abstract base class to run integration
@ -702,6 +702,56 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return builder;
}
/**
* Waits until all nodes have no pending tasks.
*/
public void waitNoPendingTasksOnAll() throws InterruptedException {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1];
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
reference[0] = null;
for (Client client : clients()) {
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
if (!pendingTasks.pendingTasks().isEmpty()) {
reference[0] = pendingTasks;
return false;
}
}
return true;
}
});
if (!applied) {
fail(reference[0].prettyPrint());
}
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
}
/**
* Waits until the elected master node has no pending tasks.
*/
public void waitNoPendingTasksOnMaster() throws InterruptedException {
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
final PendingClusterTasksResponse[] reference = new PendingClusterTasksResponse[1];
boolean applied = awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
reference[0] = null;
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
if (!pendingTasks.pendingTasks().isEmpty()) {
reference[0] = pendingTasks;
return false;
}
return true;
}
});
if (!applied) {
fail(reference[0].prettyPrint());
}
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).get();
}
/**
* Restricts the given index to be allocated on <code>n</code> nodes using the allocation deciders.
* Yet if the shards can't be allocated on any other node shards for this index will remain allocated on