From 2e08977adcb3ee7db9e40b3802dbaa96bb6e2a42 Mon Sep 17 00:00:00 2001 From: uboness Date: Sat, 7 Dec 2013 22:29:06 +0100 Subject: [PATCH] - fixing update mapping tests for index operations so the number of request will be based on the index thread pool size - added update mapping tests for bulk operations --- .../elasticsearch/threadpool/ThreadPool.java | 8 ++ .../indices/mapping/UpdateMappingTests.java | 77 ++++++++++++++++++- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 32bd08a42f4..fb9c200a953 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -155,6 +155,14 @@ public class ThreadPool extends AbstractComponent { return new ThreadPoolInfo(infos); } + public Info info(String name) { + ExecutorHolder holder = executors.get(name); + if (holder == null) { + return null; + } + return holder.info; + } + public ThreadPoolStats stats() { List stats = new ArrayList(); for (ExecutorHolder holder : executors.values()) { diff --git a/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java b/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java index 79bf1148d3a..526b800b259 100644 --- a/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java +++ b/src/test/java/org/elasticsearch/indices/mapping/UpdateMappingTests.java @@ -6,8 +6,12 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse; import org.elasticsearch.action.admin.indices.refresh.RefreshResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -19,6 +23,7 @@ import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MergeMappingException; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.threadpool.ThreadPool; import org.hamcrest.Matchers; import org.junit.Test; @@ -36,7 +41,7 @@ import static org.hamcrest.Matchers.*; public class UpdateMappingTests extends ElasticsearchIntegrationTest { @Test - public void dynamicUpdates() throws Exception { + public void dynamicUpdates_Index() throws Exception { client().admin().indices().prepareCreate("test") .setSettings( ImmutableSettings.settingsBuilder() @@ -45,7 +50,8 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest { ).execute().actionGet(); client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); - int recCount = 200; + ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.INDEX); + int recCount = info.getMax() + (int) info.getQueueSize().getSingles(); final CountDownLatch latch = new CountDownLatch(recCount); for (int rec = 0; rec < recCount; rec++) { client().prepareIndex("test", "type", "rec" + rec).setSource("field" + rec, "some_value").execute(new ActionListener() { @@ -84,6 +90,73 @@ public class UpdateMappingTests extends ElasticsearchIntegrationTest { } } + @Test + public void dynamicUpdates_Bulk() throws Exception { + client().admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 0) + ).execute().actionGet(); + client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + + ThreadPool.Info info = cluster().getInstance(ThreadPool.class).info(ThreadPool.Names.BULK); + int bulkCount = info.getMax() + (int) info.getQueueSize().getSingles(); + int bulkSize = between(4, 10); + int recCount = bulkCount * bulkSize; + int idCounter = 0; + final CountDownLatch latch = new CountDownLatch(bulkCount); + for (int i = 0; i < bulkCount; i++) { + BulkRequestBuilder bulk = client().prepareBulk(); + for (int rec = 0; rec < bulkSize; rec++) { + int id = idCounter++; + bulk.add(new IndexRequestBuilder(client()) + .setOpType(IndexRequest.OpType.INDEX) + .setIndex("test") + .setType("type") + .setId("rec" + id) + .setSource("field" + id, "some_value")); + } + bulk.execute(new ActionListener() { + @Override + public void onResponse(BulkResponse bulkItemResponses) { + if (bulkItemResponses.hasFailures()) { + System.out.println("failed to index in test: " + bulkItemResponses.buildFailureMessage()); + } + latch.countDown(); + } + + @Override + public void onFailure(Throwable e) { + logger.error("failed to index in test", e); + latch.countDown(); + } + }); + } + latch.await(); + + logger.info("wait till the mappings have been processed..."); + awaitBusy(new Predicate() { + @Override + public boolean apply(Object input) { + PendingClusterTasksResponse pendingTasks = client().admin().cluster().preparePendingClusterTasks().get(); + return pendingTasks.pendingTasks().isEmpty(); + } + }); + + logger.info("checking all the documents are there"); + RefreshResponse refreshResponse = client().admin().indices().prepareRefresh().execute().actionGet(); + assertThat(refreshResponse.getFailedShards(), equalTo(0)); + CountResponse response = client().prepareCount("test").execute().actionGet(); + assertThat(response.getCount(), equalTo((long) recCount)); + + logger.info("checking all the fields are in the mappings"); + String source = client().admin().cluster().prepareState().get().getState().getMetaData().getIndices().get("test").getMappings().get("type").source().string(); + for (int rec = 0; rec < recCount; rec++) { + assertThat(source, containsString("\"field" + rec + "\"")); + } + } + @Test(expected = MergeMappingException.class) public void updateMappingWithConflicts() throws Exception {