diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 6dacb21b239..cbfc8431628 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.xcontent.XContentType; import java.io.Closeable; import java.util.Objects; @@ -288,16 +289,46 @@ public class BulkProcessor implements Closeable { executeIfNeeded(); } + /** + * Adds the data from the bytes to be processed by the bulk processor + * @deprecated use {@link #add(BytesReference, String, String, XContentType)} instead to avoid content type auto-detection + */ + @Deprecated public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { return add(data, defaultIndex, defaultType, null, null); } - public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultPipeline, @Nullable Object payload) throws Exception { + /** + * Adds the data from the bytes to be processed by the bulk processor + */ + public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, + XContentType xContentType) throws Exception { + return add(data, defaultIndex, defaultType, null, null, xContentType); + } + + /** + * Adds the data from the bytes to be processed by the bulk processor + * @deprecated use {@link #add(BytesReference, String, String, String, Object, XContentType)} instead to avoid content type + * auto-detection + */ + @Deprecated + public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, + @Nullable String defaultPipeline, @Nullable Object payload) throws Exception { bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true); executeIfNeeded(); return this; } + /** + * Adds the data from the bytes to be processed by the bulk processor + */ + public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, + @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception { + bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType); + executeIfNeeded(); + return this; + } + private void executeIfNeeded() { ensureOpen(); if (!isOverTheLimit()) { diff --git a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java index 7e608815c43..e6eb9afe704 100644 --- a/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java +++ b/core/src/test/java/org/elasticsearch/action/bulk/BulkProcessorIT.java @@ -27,10 +27,13 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.MockTransportClient; @@ -54,7 +57,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class BulkProcessorIT extends ESIntegTestCase { - public void testThatBulkProcessorCountIsCorrect() throws InterruptedException { + public void testThatBulkProcessorCountIsCorrect() throws Exception { final CountDownLatch latch = new CountDownLatch(1); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); @@ -77,7 +80,7 @@ public class BulkProcessorIT extends ESIntegTestCase { } } - public void testBulkProcessorFlush() throws InterruptedException { + public void testBulkProcessorFlush() throws Exception { final CountDownLatch latch = new CountDownLatch(1); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); @@ -296,11 +299,18 @@ public class BulkProcessorIT extends ESIntegTestCase { assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs); } - private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) { + private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception { MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet(); for (int i = 1; i <= numDocs; i++) { - processor.add(new IndexRequest("test", "test", Integer.toString(i)) - .source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); + if (randomBoolean()) { + processor.add(new IndexRequest("test", "test", Integer.toString(i)) + .source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); + } else { + final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n" + + JsonXContent.contentBuilder() + .startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject().string() + "\n"; + processor.add(new BytesArray(source), null, null, XContentType.JSON); + } multiGetRequestBuilder.add("test", "test", Integer.toString(i)); } return multiGetRequestBuilder; @@ -313,7 +323,8 @@ public class BulkProcessorIT extends ESIntegTestCase { assertThat(bulkItemResponse.getIndex(), equalTo("test")); assertThat(bulkItemResponse.getType(), equalTo("test")); assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++))); - assertThat(bulkItemResponse.isFailed(), equalTo(false)); + assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(), + bulkItemResponse.isFailed(), equalTo(false)); } }