diff --git a/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java b/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java index c0d71e8b483..77762e5d345 100644 --- a/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java +++ b/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerTests.java @@ -18,8 +18,10 @@ */ package org.elasticsearch.indices; +import com.google.common.base.Predicate; import com.google.common.collect.Maps; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; @@ -27,8 +29,10 @@ import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; @@ -38,8 +42,6 @@ import static org.elasticsearch.index.shard.IndexShardState.*; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.notNullValue; @ClusterScope(scope = Scope.TEST, numNodes = 0) public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest { @@ -99,15 +101,34 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest assertShardStatesMatch(stateChangeListenerNode2, 6, CLOSED); } - private static void assertShardStatesMatch(IndexShardStateChangeListener stateChangeListener, int numShards, IndexShardState... shardStates) { - assertThat(stateChangeListener.shardStates.size(), equalTo(numShards)); - for (List indexShardStates : stateChangeListener.shardStates.values()) { - assertThat(indexShardStates, notNullValue()); - assertThat(indexShardStates.size(), equalTo(shardStates.length)); - for (int i = 0; i < shardStates.length; i++) { - assertThat(indexShardStates.get(i), equalTo(shardStates[i])); + private static void assertShardStatesMatch(final IndexShardStateChangeListener stateChangeListener, final int numShards, final IndexShardState... shardStates) + throws InterruptedException { + + Predicate waitPredicate = new Predicate() { + @Override + public boolean apply(Object input) { + if (stateChangeListener.shardStates.size() != numShards) { + return false; + } + for (List indexShardStates : stateChangeListener.shardStates.values()) { + if (indexShardStates == null || indexShardStates.size() != shardStates.length) { + return false; + } + for (int i = 0; i < shardStates.length; i++) { + if (indexShardStates.get(i) != shardStates[i]) { + return false; + } + } + } + return true; } + }; + if (!awaitBusy(waitPredicate, 1, TimeUnit.MINUTES)) { + fail("failed to observe expect shard states\n" + + "expected: [" + numShards + "] shards with states: " + Strings.arrayToCommaDelimitedString(shardStates) + "\n" + + "observed:\n" + stateChangeListener); } + stateChangeListener.shardStates.clear(); } @@ -123,5 +144,14 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest shardStates.add(newState); } } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (Map.Entry> entry : shardStates.entrySet()) { + sb.append(entry.getKey()).append(" --> ").append(Strings.collectionToCommaDelimitedString(entry.getValue())).append("\n"); + } + return sb.toString(); + } } }