improve search while create test
- improve the test to be more re-creatable - have tests for various number of replica counts, to check if failures are caused by searching on replicas that might not have been refreshed yet - improve test to test explicit index creation, and index creation caused by index operation - have an initial search go to _primary, to check if failure fails when searching on replica because it missed a refresh
This commit is contained in:
parent
9f2e615ed9
commit
b329943632
|
@ -19,74 +19,75 @@
|
||||||
|
|
||||||
package org.elasticsearch.test.integration.search.basic;
|
package org.elasticsearch.test.integration.search.basic;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||||
import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
|
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||||
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.cluster.service.PendingClusterTask;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Arrays;
|
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test basically verifies that search with a single shard active (cause we indexed to it) and other
|
||||||
|
* shards possibly not active at all (cause they haven't allocated) will still work.
|
||||||
|
*/
|
||||||
public class SearchWhileCreatingIndexTests extends AbstractSharedClusterTest {
|
public class SearchWhileCreatingIndexTests extends AbstractSharedClusterTest {
|
||||||
|
|
||||||
protected int numberOfNodes() {
|
protected int numberOfNodes() {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This test basically verifies that search with a single shard active (cause we indexed to it) and other
|
|
||||||
* shards possibly not active at all (cause they haven't allocated) will still work.
|
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
public void searchWhileCreatingIndex() throws Throwable {
|
public void testIndexCausesIndexCreation() throws Exception {
|
||||||
Thread backgroundThread;
|
searchWhileCreatingIndex(-1, 1); // 1 replica in our default...
|
||||||
final Throwable[] threadException = new Throwable[1];
|
}
|
||||||
backgroundThread = new Thread(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
for (int i = 0; i < 20; i++) {
|
|
||||||
logger.info("Running iteration {}", i);
|
|
||||||
prepareCreate("test").setSettings(settingsBuilder().put("index.number_of_shards", 10));
|
|
||||||
client().prepareIndex("test", "type1", "id:" + i).setSource("field", "test").execute().actionGet();
|
|
||||||
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
|
|
||||||
assertThat(refreshResponse.getSuccessfulShards(), greaterThanOrEqualTo(1)); // at least one shard should be successful when refreshing
|
|
||||||
SearchResponse searchResponse = client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
|
||||||
assertThat("found unexpected number of hits, shard_failures (we expected to potentially non active ones!):" + Arrays.toString(searchResponse.getShardFailures()) + " id: " + i, searchResponse.getHits().totalHits(), equalTo(1l));
|
|
||||||
wipeIndex("test");
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
threadException[0] = t;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
backgroundThread.setDaemon(true);
|
|
||||||
backgroundThread.start();
|
|
||||||
backgroundThread.join(30 * 60 * 1000);
|
|
||||||
if (threadException[0] != null) {
|
|
||||||
throw threadException[0];
|
|
||||||
}
|
|
||||||
if (backgroundThread.isAlive()) {
|
|
||||||
logger.error("Background thread hanged. Dumping cluster info");
|
|
||||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
|
|
||||||
logger.info("ClusterState: {}", clusterStateResponse.getState());
|
|
||||||
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
|
|
||||||
logger.info("Pending tasks:");
|
|
||||||
for (PendingClusterTask task : pendingTasks) {
|
|
||||||
logger.info("Task: priority: {} source: {} Time in queue (ms): {}", task.getPriority(), task.getSource(), task.timeInQueueInMillis());
|
|
||||||
}
|
|
||||||
fail("Background thread didn't finish within 30 minutes");
|
|
||||||
}
|
|
||||||
logger.info("done");
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoReplicas() throws Exception {
|
||||||
|
searchWhileCreatingIndex(10, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testOneReplica() throws Exception {
|
||||||
|
searchWhileCreatingIndex(10, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testTwoReplicas() throws Exception {
|
||||||
|
searchWhileCreatingIndex(10, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void searchWhileCreatingIndex(int numberOfShards, int numberOfReplicas) throws Exception {
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
logger.info("running iteration {}", i);
|
||||||
|
if (numberOfShards > 0) {
|
||||||
|
CreateIndexResponse createIndexResponse = prepareCreate("test")
|
||||||
|
.setSettings(settingsBuilder().put("index.number_of_shards", numberOfShards).put("index.number_of_replicas", numberOfReplicas)).get();
|
||||||
|
assertThat(createIndexResponse.isAcknowledged(), equalTo(true));
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type1", randomAsciiOfLength(5)).setSource("field", "test").execute().actionGet();
|
||||||
|
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||||
|
assertThat(refreshResponse.getSuccessfulShards(), greaterThanOrEqualTo(1)); // at least one shard should be successful when refreshing
|
||||||
|
|
||||||
|
// we want to make sure that while recovery happens, and a replica gets recovered, its properly refreshed
|
||||||
|
ClusterHealthStatus status = ClusterHealthStatus.RED;
|
||||||
|
while (status != ClusterHealthStatus.GREEN) {
|
||||||
|
// first, verify that search on the primary search works
|
||||||
|
SearchResponse searchResponse = client().prepareSearch("test").setPreference("_primary").setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||||
|
assertHitCount(searchResponse, 1);
|
||||||
|
// now, let it go to primary or replica, though in a randomized re-creatable manner
|
||||||
|
searchResponse = client().prepareSearch("test").setPreference(randomAsciiOfLength(5)).setQuery(QueryBuilders.termQuery("field", "test")).execute().actionGet();
|
||||||
|
assertHitCount(searchResponse, 1);
|
||||||
|
status = client().admin().cluster().prepareHealth("test").get().getStatus();
|
||||||
|
cluster().ensureAtLeastNumNodes(numberOfReplicas + 1);
|
||||||
|
}
|
||||||
|
wipeIndex("test");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue