Add BulkProcessor methods with XContentType parameter (#23078)

This commit adds methods to the BulkProcessor that accept bytes and a XContentType to avoid content type detection. The
methods that do not accept XContentType with bytes have been deprecated by this commit.

Relates #22691
This commit is contained in:
Jay Modi 2017-02-10 08:59:37 -05:00 committed by GitHub
parent 4f2b4724be
commit 7018b6ac6f
2 changed files with 49 additions and 7 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.Closeable; import java.io.Closeable;
import java.util.Objects; import java.util.Objects;
@ -288,16 +289,46 @@ public class BulkProcessor implements Closeable {
executeIfNeeded(); executeIfNeeded();
} }
/**
* Adds the data from the bytes to be processed by the bulk processor
* @deprecated use {@link #add(BytesReference, String, String, XContentType)} instead to avoid content type auto-detection
*/
@Deprecated
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, defaultIndex, defaultType, null, null); return add(data, defaultIndex, defaultType, null, null);
} }
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultPipeline, @Nullable Object payload) throws Exception { /**
* Adds the data from the bytes to be processed by the bulk processor
*/
public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
XContentType xContentType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, xContentType);
}
/**
* Adds the data from the bytes to be processed by the bulk processor
* @deprecated use {@link #add(BytesReference, String, String, String, Object, XContentType)} instead to avoid content type
* auto-detection
*/
@Deprecated
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true); bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true);
executeIfNeeded(); executeIfNeeded();
return this; return this;
} }
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
}
private void executeIfNeeded() { private void executeIfNeeded() {
ensureOpen(); ensureOpen();
if (!isOverTheLimit()) { if (!isOverTheLimit()) {

View File

@ -27,10 +27,13 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.MockTransportClient; import org.elasticsearch.transport.MockTransportClient;
@ -54,7 +57,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
public class BulkProcessorIT extends ESIntegTestCase { public class BulkProcessorIT extends ESIntegTestCase {
public void testThatBulkProcessorCountIsCorrect() throws InterruptedException { public void testThatBulkProcessorCountIsCorrect() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
@ -77,7 +80,7 @@ public class BulkProcessorIT extends ESIntegTestCase {
} }
} }
public void testBulkProcessorFlush() throws InterruptedException { public void testBulkProcessorFlush() throws Exception {
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
BulkProcessorTestListener listener = new BulkProcessorTestListener(latch); BulkProcessorTestListener listener = new BulkProcessorTestListener(latch);
@ -296,11 +299,18 @@ public class BulkProcessorIT extends ESIntegTestCase {
assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs); assertMultiGetResponse(multiGetRequestBuilder.get(), testDocs);
} }
private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) { private static MultiGetRequestBuilder indexDocs(Client client, BulkProcessor processor, int numDocs) throws Exception {
MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet(); MultiGetRequestBuilder multiGetRequestBuilder = client.prepareMultiGet();
for (int i = 1; i <= numDocs; i++) { for (int i = 1; i <= numDocs; i++) {
if (randomBoolean()) {
processor.add(new IndexRequest("test", "test", Integer.toString(i)) processor.add(new IndexRequest("test", "test", Integer.toString(i))
.source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30))); .source(Requests.INDEX_CONTENT_TYPE, "field", randomRealisticUnicodeOfLengthBetween(1, 30)));
} else {
final String source = "{ \"index\":{\"_index\":\"test\",\"_type\":\"test\",\"_id\":\"" + Integer.toString(i) + "\"} }\n"
+ JsonXContent.contentBuilder()
.startObject().field("field", randomRealisticUnicodeOfLengthBetween(1, 30)).endObject().string() + "\n";
processor.add(new BytesArray(source), null, null, XContentType.JSON);
}
multiGetRequestBuilder.add("test", "test", Integer.toString(i)); multiGetRequestBuilder.add("test", "test", Integer.toString(i));
} }
return multiGetRequestBuilder; return multiGetRequestBuilder;
@ -313,7 +323,8 @@ public class BulkProcessorIT extends ESIntegTestCase {
assertThat(bulkItemResponse.getIndex(), equalTo("test")); assertThat(bulkItemResponse.getIndex(), equalTo("test"));
assertThat(bulkItemResponse.getType(), equalTo("test")); assertThat(bulkItemResponse.getType(), equalTo("test"));
assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++))); assertThat(bulkItemResponse.getId(), equalTo(Integer.toString(i++)));
assertThat(bulkItemResponse.isFailed(), equalTo(false)); assertThat("item " + i + " failed with cause: " + bulkItemResponse.getFailureMessage(),
bulkItemResponse.isFailed(), equalTo(false));
} }
} }