Merge remote-tracking branch 'danielmitterdorfer/doc-bulk-processor-backoff'
This commit is contained in:
commit
b7ead723cf
|
@ -47,6 +47,7 @@ To use it, first create a `BulkProcessor` instance:
|
||||||
|
|
||||||
[source,java]
|
[source,java]
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
@ -73,6 +74,8 @@ BulkProcessor bulkProcessor = BulkProcessor.builder(
|
||||||
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) <6>
|
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB)) <6>
|
||||||
.setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
|
.setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
|
||||||
.setConcurrentRequests(1) <8>
|
.setConcurrentRequests(1) <8>
|
||||||
|
.setBackoffPolicy(
|
||||||
|
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
|
||||||
.build();
|
.build();
|
||||||
--------------------------------------------------
|
--------------------------------------------------
|
||||||
<1> Add your elasticsearch client
|
<1> Add your elasticsearch client
|
||||||
|
@ -86,6 +89,10 @@ BulkProcessor bulkProcessor = BulkProcessor.builder(
|
||||||
<7> We want to flush the bulk every 5 seconds whatever the number of requests
|
<7> We want to flush the bulk every 5 seconds whatever the number of requests
|
||||||
<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
|
<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
|
||||||
executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
|
executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
|
||||||
|
<9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
|
||||||
|
times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
|
||||||
|
which indicates that there were too little compute resources available for processing the request. To disable backoff,
|
||||||
|
pass `BackoffPolicy.noBackoff()`.
|
||||||
|
|
||||||
Then you can simply add your requests to the `BulkProcessor`:
|
Then you can simply add your requests to the `BulkProcessor`:
|
||||||
|
|
||||||
|
@ -101,6 +108,7 @@ By default, `BulkProcessor`:
|
||||||
* sets bulkSize to `5mb`
|
* sets bulkSize to `5mb`
|
||||||
* does not set flushInterval
|
* does not set flushInterval
|
||||||
* sets concurrentRequests to 1
|
* sets concurrentRequests to 1
|
||||||
|
* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
|
||||||
|
|
||||||
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
|
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue