From e32c7f1d72ff8eca8b1529c5bfc7a83aff8c6c9b Mon Sep 17 00:00:00 2001 From: David Pilato Date: Fri, 16 Dec 2016 16:45:56 +0100 Subject: [PATCH] Explain how to use bulk processor in a test context When using a bulk processor in test, you might write something like: ```java BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }) .setBulkActions(10000) .setFlushInterval(TimeValue.timeValueSeconds(10)) .build(); for (int i = 0; i < 10000; i++) { bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i) .source(jsonBuilder().startObject().field("foo", "bar").endObject() )); } bulkProcessor.flush(); client.admin().indices().prepareRefresh("foo").get(); SearchResponse response = client.prepareSearch("foo").get(); // response does not contain any hit ``` The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler. When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs. But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method. We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler. ```java BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) {} @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {} @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {} }) .setBulkActions(5000) .setFlushInterval(TimeValue.timeValueSeconds(10)) .setConcurrentRequests(0) .build(); ``` Closes #22158. --- docs/java-api/docs/bulk.asciidoc | 52 +++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/docs/java-api/docs/bulk.asciidoc b/docs/java-api/docs/bulk.asciidoc index 288bd8415ab..07849164a68 100644 --- a/docs/java-api/docs/bulk.asciidoc +++ b/docs/java-api/docs/bulk.asciidoc @@ -87,13 +87,24 @@ BulkProcessor bulkProcessor = BulkProcessor.builder( <5> We want to execute the bulk every 10 000 requests <6> We want to flush the bulk every 5mb <7> We want to flush the bulk every 5 seconds whatever the number of requests -<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be +<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException` which indicates that there were too little compute resources available for processing the request. To disable backoff, pass `BackoffPolicy.noBackoff()`. +By default, `BulkProcessor`: + +* sets bulkActions to `1000` +* sets bulkSize to `5mb` +* does not set flushInterval +* sets concurrentRequests to 1, which means an asynchronous execution of the flush operation. +* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds. + +[[java-docs-bulk-processor-requests]] +==== Add requests + Then you can simply add your requests to the `BulkProcessor`: [source,java] @@ -102,13 +113,8 @@ bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc h bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); -------------------------------------------------- -By default, `BulkProcessor`: - -* sets bulkActions to `1000` -* sets bulkSize to `5mb` -* does not set flushInterval -* sets concurrentRequests to 1 -* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds. +[[java-docs-bulk-processor-close]] +==== Closing the Bulk Processor When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods: @@ -129,3 +135,33 @@ Both methods flush any remaining documents and disable all other scheduled flush all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete, `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately. +[[java-docs-bulk-processor-tests]] +==== Using Bulk Processor in tests + +If you are running tests with elasticsearch and are using the `BulkProcessor` to populate your dataset +you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed +in a synchronous manner: + +[source,java] +-------------------------------------------------- +BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) + .setBulkActions(10000) + .setConcurrentRequests(0) + .build(); + +// Add your requests +bulkProcessor.add(/* Your requests */); + +// Flush any remaining requests +bulkProcessor.flush(); + +// Or close the bulkProcessor if you don't need it anymore +bulkProcessor.close(); + +// Refresh your indices +client.admin().indices().prepareRefresh().get(); + +// Now you can start searching! +client.prepareSearch().get(); +-------------------------------------------------- +