diff --git a/src/test/java/org/elasticsearch/test/integration/search/basic/SearchWhileCreatingIndexTests.java b/src/test/java/org/elasticsearch/test/integration/search/basic/SearchWhileCreatingIndexTests.java index 02b102a6f30..343d061b4b8 100644 --- a/src/test/java/org/elasticsearch/test/integration/search/basic/SearchWhileCreatingIndexTests.java +++ b/src/test/java/org/elasticsearch/test/integration/search/basic/SearchWhileCreatingIndexTests.java @@ -19,74 +19,75 @@ package org.elasticsearch.test.integration.search.basic; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.cluster.service.PendingClusterTask; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.test.integration.AbstractSharedClusterTest; import org.junit.Test; -import java.util.Arrays; - import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +/** + * This test basically verifies that search with a single shard active (cause we indexed to it) and other + * shards possibly not active at all (cause they haven't allocated) will still work. + */ public class SearchWhileCreatingIndexTests extends AbstractSharedClusterTest { protected int numberOfNodes() { return 1; } - /** - * This test basically verifies that search with a single shard active (cause we indexed to it) and other - * shards possibly not active at all (cause they haven't allocated) will still work. - */ @Test - public void searchWhileCreatingIndex() throws Throwable { - Thread backgroundThread; - final Throwable[] threadException = new Throwable[1]; - backgroundThread = new Thread(new Runnable() { - @Override - public void run() { - try { - for (int i = 0; i < 20; i++) { - logger.info("Running iteration {}", i); - prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 10)); - client().prepareIndex("test", "type1", "id:" + i).setSource("field", "test").execute().actionGet(); - RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet(); - assertThat(refreshResponse.getSuccessfulShards(), greaterThanOrEqualTo(1)); // at least one shard should be successful when refreshing - SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); - assertThat("found unexpected number of hits, shard_failures (we expected to potentially non active ones!):" + Arrays.toString(searchResponse.getShardFailures()) + " id: " + i, searchResponse.getHits().totalHits(), equalTo(1l)); - wipeIndex("test"); - } - } catch (Throwable t) { - threadException[0] = t; - } - } - }); - backgroundThread.setDaemon(true); - backgroundThread.start(); - backgroundThread.join(30 * 60 * 1000); - if (threadException[0] != null) { - throw threadException[0]; - } - if (backgroundThread.isAlive()) { - logger.error("Background thread hanged. Dumping cluster info"); - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); - logger.info("ClusterState: {}", clusterStateResponse.getState()); - PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); - logger.info("Pending tasks:"); - for (PendingClusterTask task : pendingTasks) { - logger.info("Task: priority: {} source: {} Time in queue (ms): {}", task.getPriority(), task.getSource(), task.timeInQueueInMillis()); - } - fail("Background thread didn't finish within 30 minutes"); - } - logger.info("done"); + public void testIndexCausesIndexCreation() throws Exception { + searchWhileCreatingIndex(-1, 1); // 1 replica in our default... + } + @Test + public void testNoReplicas() throws Exception { + searchWhileCreatingIndex(10, 0); + } + @Test + public void testOneReplica() throws Exception { + searchWhileCreatingIndex(10, 1); + } + + @Test + public void testTwoReplicas() throws Exception { + searchWhileCreatingIndex(10, 2); + } + + private void searchWhileCreatingIndex(int numberOfShards, int numberOfReplicas) throws Exception { + for (int i = 0; i < 20; i++) { + logger.info("running iteration {}", i); + if (numberOfShards > 0) { + CreateIndexResponse createIndexResponse = prepareCreate("test") + .setSettings(settingsBuilder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", numberOfReplicas)).get(); + assertThat(createIndexResponse.isAcknowledged(), equalTo(true)); + } + client().prepareIndex("test", "type1", randomAsciiOfLength(5)).setSource("field", "test").execute().actionGet(); + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet(); + assertThat(refreshResponse.getSuccessfulShards(), greaterThanOrEqualTo(1)); // at least one shard should be successful when refreshing + + // we want to make sure that while recovery happens, and a replica gets recovered, its properly refreshed + ClusterHealthStatus status = ClusterHealthStatus.RED; + while (status != ClusterHealthStatus.GREEN) { + // first, verify that search on the primary search works + SearchResponse searchResponse = client().prepareSearch("test").setPreference("_primary").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); + assertHitCount(searchResponse, 1); + // now, let it go to primary or replica, though in a randomized re-creatable manner + searchResponse = client().prepareSearch("test").setPreference(randomAsciiOfLength(5)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet(); + assertHitCount(searchResponse, 1); + status = client().admin().cluster().prepareHealth("test").get().getStatus(); + cluster().ensureAtLeastNumNodes(numberOfReplicas + 1); + } + wipeIndex("test"); + } } } \ No newline at end of file