Explain how to use bulk processor in a test context

When using a bulk processor in test, you might write something like:

```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override public void beforeBulk(long executionId, BulkRequest request) {}
    @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
    @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
        .setBulkActions(10000)
        .setFlushInterval(TimeValue.timeValueSeconds(10))
        .build();

for (int i = 0; i < 10000; i++) {
    bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
            .source(jsonBuilder().startObject().field("foo", "bar").endObject()
    ));
}

bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```

The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.

We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.

```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    @Override public void beforeBulk(long executionId, BulkRequest request) {}
    @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
    @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
        .setBulkActions(5000)
        .setFlushInterval(TimeValue.timeValueSeconds(10))
        .setConcurrentRequests(0)
        .build();
```

Closes #22158.
This commit is contained in:
David Pilato 2016-12-16 16:45:56 +01:00
parent 43f9cd1fd4
commit e32c7f1d72
1 changed files with 44 additions and 8 deletions

View File

@ -87,13 +87,24 @@ BulkProcessor bulkProcessor = BulkProcessor.builder(
<5> We want to execute the bulk every 10 000 requests <5> We want to execute the bulk every 10 000 requests
<6> We want to flush the bulk every 5mb <6> We want to flush the bulk every 5mb
<7> We want to flush the bulk every 5 seconds whatever the number of requests <7> We want to flush the bulk every 5 seconds whatever the number of requests
<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
<9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException` times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
which indicates that there were too little compute resources available for processing the request. To disable backoff, which indicates that there were too little compute resources available for processing the request. To disable backoff,
pass `BackoffPolicy.noBackoff()`. pass `BackoffPolicy.noBackoff()`.
By default, `BulkProcessor`:
* sets bulkActions to `1000`
* sets bulkSize to `5mb`
* does not set flushInterval
* sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
[[java-docs-bulk-processor-requests]]
==== Add requests
Then you can simply add your requests to the `BulkProcessor`: Then you can simply add your requests to the `BulkProcessor`:
[source,java] [source,java]
@ -102,13 +113,8 @@ bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc h
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
-------------------------------------------------- --------------------------------------------------
By default, `BulkProcessor`: [[java-docs-bulk-processor-close]]
==== Closing the Bulk Processor
* sets bulkActions to `1000`
* sets bulkSize to `5mb`
* does not set flushInterval
* sets concurrentRequests to 1
* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods: When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
@ -129,3 +135,33 @@ Both methods flush any remaining documents and disable all other scheduled flush
all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete, all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
`false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately. `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.
[[java-docs-bulk-processor-tests]]
==== Using Bulk Processor in tests
If you are running tests with elasticsearch and are using the `BulkProcessor` to populate your dataset
you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed
in a synchronous manner:
[source,java]
--------------------------------------------------
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
--------------------------------------------------