- fixing update mapping tests for index operations so the number of request will be based on the index thread pool size

- added update mapping tests for bulk operations
This commit is contained in:
uboness 2013-12-07 22:29:06 +01:00
parent 822acfa568
commit 2e08977adc
2 changed files with 83 additions and 2 deletions

View File

@ -155,6 +155,14 @@ public class ThreadPool extends AbstractComponent {
return new ThreadPoolInfo(infos);
}
public Info info(String name) {
ExecutorHolder holder = executors.get(name);
if (holder == null) {
return null;
}
return holder.info;
}
public ThreadPoolStats stats() {
List<ThreadPoolStats.Stats> stats = new ArrayList<ThreadPoolStats.Stats>();
for (ExecutorHolder holder : executors.values()) {

View File

@ -6,8 +6,12 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -19,6 +23,7 @@ import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.Matchers;
import org.junit.Test;
@ -36,7 +41,7 @@ import static org.hamcrest.Matchers.*;
public class UpdateMappingTests extends ElasticsearchIntegrationTest {
@Test
public void dynamicUpdates() throws Exception {
public void dynamicUpdates_Index() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
@ -45,7 +50,8 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
int recCount = 200;
ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.INDEX);
int recCount = info.getMax() + (int) info.getQueueSize().getSingles();
final CountDownLatch latch = new CountDownLatch(recCount);
for (int rec = 0; rec < recCount; rec++) {
client().prepareIndex("test", "type", "rec" + rec).setSource("field" + rec, "some_value").execute(new ActionListener<IndexResponse>() {
@ -84,6 +90,73 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest {
}
}
@Test
public void dynamicUpdates_Bulk() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0)
).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.BULK);
int bulkCount = info.getMax() + (int) info.getQueueSize().getSingles();
int bulkSize = between(4, 10);
int recCount = bulkCount * bulkSize;
int idCounter = 0;
final CountDownLatch latch = new CountDownLatch(bulkCount);
for (int i = 0; i < bulkCount; i++) {
BulkRequestBuilder bulk = client().prepareBulk();
for (int rec = 0; rec < bulkSize; rec++) {
int id = idCounter++;
bulk.add(new IndexRequestBuilder(client())
.setOpType(IndexRequest.OpType.INDEX)
.setIndex("test")
.setType("type")
.setId("rec" + id)
.setSource("field" + id, "some_value"));
}
bulk.execute(new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkItemResponses) {
if (bulkItemResponses.hasFailures()) {
System.out.println("failed to index in test: " + bulkItemResponses.buildFailureMessage());
}
latch.countDown();
}
@Override
public void onFailure(Throwable e) {
logger.error("failed to index in test", e);
latch.countDown();
}
});
}
latch.await();
logger.info("wait till the mappings have been processed...");
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get();
return pendingTasks.pendingTasks().isEmpty();
}
});
logger.info("checking all the documents are there");
RefreshResponse refreshResponse = client().admin().indices().prepareRefresh().execute().actionGet();
assertThat(refreshResponse.getFailedShards(), equalTo(0));
CountResponse response = client().prepareCount("test").execute().actionGet();
assertThat(response.getCount(), equalTo((long) recCount));
logger.info("checking all the fields are in the mappings");
String source = client().admin().cluster().prepareState().get().getState().getMetaData().getIndices().get("test").getMappings().get("type").source().string();
for (int rec = 0; rec < recCount; rec++) {
assertThat(source, containsString("\"field" + rec + "\""));
}
}
@Test(expected = MergeMappingException.class)
public void updateMappingWithConflicts() throws Exception {