diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java index ebebfade776..db5fcc11bce 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java @@ -27,7 +27,9 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -45,6 +47,9 @@ import org.elasticsearch.client.ESRestHighLevelClientTestCase; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; @@ -55,12 +60,14 @@ import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptType; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; +import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -855,4 +862,97 @@ public class CRUDDocumentationIT extends ESRestHighLevelClientTestCase { // end::get-conflict } } + + public void testBulkProcessor() throws InterruptedException, IOException { + Settings settings = Settings.builder().put("node.name", "my-application").build(); + RestHighLevelClient client = highLevelClient(); + { + // tag::bulk-processor-init + ThreadPool threadPool = new ThreadPool(settings); // <1> + + BulkProcessor.Listener listener = new BulkProcessor.Listener() { // <2> + @Override + public void beforeBulk(long executionId, BulkRequest request) { + // <3> + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + // <4> + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + // <5> + } + }; + + BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool) + .build(); // <6> + // end::bulk-processor-init + assertNotNull(bulkProcessor); + + // tag::bulk-processor-add + IndexRequest one = new IndexRequest("posts", "doc", "1"). + source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); + IndexRequest two = new IndexRequest("posts", "doc", "2") + .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); + IndexRequest three = new IndexRequest("posts", "doc", "3") + .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); + + bulkProcessor.add(one); + bulkProcessor.add(two); + bulkProcessor.add(three); + // end::bulk-processor-add + + // tag::bulk-processor-await + boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); // <1> + // end::bulk-processor-await + assertTrue(terminated); + + // tag::bulk-processor-close + bulkProcessor.close(); + // end::bulk-processor-close + terminate(threadPool); + } + { + // tag::bulk-processor-listener + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + int numberOfActions = request.numberOfActions(); // <1> + logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { // <2> + logger.warn("Bulk [{}] executed with failures", executionId); + } else { + logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + logger.error("Failed to execute bulk", failure); // <3> + } + }; + // end::bulk-processor-listener + + ThreadPool threadPool = new ThreadPool(settings); + try { + // tag::bulk-processor-options + BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool); + builder.setBulkActions(500); // <1> + builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); // <2> + builder.setConcurrentRequests(0); // <3> + builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); // <4> + builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); // <5> + // end::bulk-processor-options + } finally { + terminate(threadPool); + } + } + } } diff --git a/docs/java-rest/high-level/apis/bulk.asciidoc b/docs/java-rest/high-level/apis/bulk.asciidoc index 07119fb4b93..9bbc0b31062 100644 --- a/docs/java-rest/high-level/apis/bulk.asciidoc +++ b/docs/java-rest/high-level/apis/bulk.asciidoc @@ -1,6 +1,8 @@ [[java-rest-high-document-bulk]] === Bulk API +NOTE: The Java High Level REST Client provides the <> to assist with bulk requests + [[java-rest-high-document-bulk-request]] ==== Bulk Request @@ -115,3 +117,95 @@ include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-errors] -------------------------------------------------- <1> Indicate if a given operation failed <2> Retrieve the failure of the failed operation + +[[java-rest-high-document-bulk-processor]] +==== Bulk Processor + +The `BulkProcessor` simplifies the usage of the Bulk API by providing +a utility class that allows index/update/delete operations to be +transparently executed as they are added to the processor. + +In order to execute the requests, the `BulkProcessor` requires 3 components: + +`RestHighLevelClient`:: This client is used to execute the `BulkRequest` +and to retrieve the `BulkResponse` +`BulkProcessor.Listener`:: This listener is called before and after +every `BulkRequest` execution or when a `BulkRequest` failed +`ThreadPool`:: The `BulkRequest` executions are done using threads from this +pool, allowing the `BulkProcessor` to work in a non-blocking manner and to +accept new index/update/delete requests while bulk requests are executing. + +Then the `BulkProcessor.Builder` class can be used to build a new `BulkProcessor`: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-init] +-------------------------------------------------- +<1> Create the `ThreadPool` using the given `Settings` +<2> Create the `BulkProcessor.Listener` +<3> This method is called before each execution of a `BulkRequest` +<4> This method is called after each execution of a `BulkRequest` +<5> This method is called when a `BulkRequest` failed +<6> Create the `BulkProcessor` by calling the `build()` method from +the `BulkProcessor.Builder`. The `RestHighLevelClient.bulkAsync()` +method will be used to execute the `BulkRequest` under the hood. + +The `BulkProcessor.Builder` provides methods to configure how the `BulkProcessor` +should handle requests execution: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-options] +-------------------------------------------------- +<1> Set when to flush a new bulk request based on the number of +actions currently added (defaults to 1000, use -1 to disable it) +<2> Set when to flush a new bulk request based on the size of +actions currently added (defaults to 5Mb, use -1 to disable it) +<3> Set the number of concurrent requests allowed to be executed +(default to 1, use 0 to only allow the execution of a single request) +<4> Set a flush interval flushing any `BulkRequest` pending if the +interval passes (defaults to not set) +<5> Set a constant back off policy that initially waits for 1 second +and retries up to 3 times. See `BackoffPolicy.noBackoff()`, +`BackoffPolicy.constantBackoff()` and `BackoffPolicy.exponentialBackoff()` +for more options. + +Once the `BulkProcessor` is created requests can be added to it: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-add] +-------------------------------------------------- + +The requests will be executed by the `BulkProcessor`, which takes care of +calling the `BulkProcessor.Listener` for every bulk request. + +The listener provides methods to access to the `BulkRequest` and the `BulkResponse`: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-listener] +-------------------------------------------------- +<1> Called before each execution of a `BulkRequest`, this method allows +to know the number of operations that are going to be executed within the `BulkRequest` +<2> Called after each execution of a `BulkRequest`, this method allows +to know if the `BulkResponse` contains errors +<3> Called if the `BulkRequest` failed, this method allows to know +the failure + +Once all requests have been added to the `BulkProcessor`, its instance needs to +be closed closed using one of the two available closing methods. + +The `awaitClose()` method can be used to wait until all requests have been processed + or the specified waiting time elapses: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-await] +-------------------------------------------------- +<1> The method returns `true` if all bulk requests completed and `false` if the +waiting time elapsed before all the bulk requests completed + +The `close()` method can be used to immediately close the `BulkProcessor`: +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests}/CRUDDocumentationIT.java[bulk-processor-close] +-------------------------------------------------- + +Both methods flush the requests added to the processor before closing the processor +and also forbid any new request to be added to it.