diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java index 72ffcd7c506..52110989e17 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/BulkProcessorIT.java @@ -34,6 +34,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.rest.action.document.RestBulkAction; @@ -75,12 +76,12 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); } - + private static BulkProcessor.Builder initBulkProcessorBuilderUsingTypes(BulkProcessor.Listener listener) { return BulkProcessor.builder( (request, bulkListener) -> highLevelClient().bulkAsync(request, expectWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE), bulkListener), listener); - } + } public void testThatBulkProcessorCountIsCorrect() throws Exception { final CountDownLatch latch = new CountDownLatch(1); @@ -383,14 +384,14 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { .build()) { indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id"); latch.await(); - + assertThat(listener.beforeCounts.get(), equalTo(1)); assertThat(listener.afterCounts.get(), equalTo(1)); assertThat(listener.bulkFailures.size(), equalTo(0)); assertResponseItems(listener.bulkItems, numDocs, localType); - + Iterable hits = searchAll(new SearchRequest("test").routing("routing")); - + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(localType)))); assertThat(hits, containsInAnyOrder(expectedIds(numDocs))); @@ -398,7 +399,7 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { } { //Check that untyped document additions and untyped global inherit the established custom type - // (the custom document type introduced to the mapping by the earlier code in this test) + // (the custom document type introduced to the mapping by the earlier code in this test) String globalType = null; String localType = null; final CountDownLatch latch = new CountDownLatch(1); @@ -414,20 +415,19 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { .build()) { indexDocs(processor, numDocs, null, localType, "test", globalType, "pipeline_id"); latch.await(); - + assertThat(listener.beforeCounts.get(), equalTo(1)); assertThat(listener.afterCounts.get(), equalTo(1)); assertThat(listener.bulkFailures.size(), equalTo(0)); assertResponseItems(listener.bulkItems, numDocs, MapperService.SINGLE_MAPPING_NAME); - + Iterable hits = searchAll(new SearchRequest("test").routing("routing")); - + assertThat(hits, everyItem(hasProperty(fieldFromSource("fieldNameXYZ"), equalTo("valueXYZ")))); assertThat(hits, everyItem(Matchers.allOf(hasIndex("test"), hasType(customType)))); assertThat(hits, containsInAnyOrder(expectedIds(numDocs))); } - } - assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); + } } @SuppressWarnings("unchecked") @@ -438,8 +438,8 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { .>toArray(Matcher[]::new); } - private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType, - String globalIndex, String globalType, String globalPipeline) throws Exception { + private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs, String localIndex, String localType, + String globalIndex, String globalType, String globalPipeline) throws Exception { MultiGetRequest multiGetRequest = new MultiGetRequest(); for (int i = 1; i <= numDocs; i++) { if (randomBoolean()) { @@ -448,6 +448,11 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { } else { BytesArray data = bytesBulkRequest(localIndex, localType, i); processor.add(data, globalIndex, globalType, globalPipeline, null, XContentType.JSON); + + if (localType != null) { + // If the payload contains types, parsing it into a bulk request results in a warning. + assertWarnings(RestBulkAction.TYPES_DEPRECATION_MESSAGE); + } } multiGetRequest.add(localIndex, Integer.toString(i)); } @@ -455,26 +460,29 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase { } private static BytesArray bytesBulkRequest(String localIndex, String localType, int id) throws IOException { - String action = Strings.toString(jsonBuilder() - .startObject() - .startObject("index") - .field("_index", localIndex) - .field("_type", localType) - .field("_id", Integer.toString(id)) - .endObject() - .endObject() - ); - String source = Strings.toString(jsonBuilder() + XContentBuilder action = jsonBuilder().startObject().startObject("index"); + + if (localIndex != null) { + action.field("_index", localIndex); + } + + if (localType != null) { + action.field("_type", localType); + } + + action.field("_id", Integer.toString(id)); + action.endObject().endObject(); + + XContentBuilder source = jsonBuilder() .startObject() .field("field", randomRealisticUnicodeOfLengthBetween(1, 30)) - .endObject() - ); + .endObject(); - String request = action + "\n" + source + "\n"; + String request = Strings.toString(action) + "\n" + Strings.toString(source) + "\n"; return new BytesArray(request); } - private static MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { + private MultiGetRequest indexDocs(BulkProcessor processor, int numDocs) throws Exception { return indexDocs(processor, numDocs, "test", null, null, null, null); }