2015-06-24 17:27:19 -04:00
|
|
|
[[java-docs-bulk]]
|
|
|
|
=== Bulk API
|
2013-08-28 19:24:34 -04:00
|
|
|
|
|
|
|
The bulk API allows one to index and delete several documents in a
|
|
|
|
single request. Here is a sample usage:
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
|
|
|
import static org.elasticsearch.common.xcontent.XContentFactory.*;
|
|
|
|
|
|
|
|
BulkRequestBuilder bulkRequest = client.prepareBulk();
|
|
|
|
|
|
|
|
// either use client#prepare, or use Requests# to directly build index/delete requests
|
2018-10-22 14:54:04 -04:00
|
|
|
bulkRequest.add(client.prepareIndex("twitter", "_doc", "1")
|
2013-08-28 19:24:34 -04:00
|
|
|
.setSource(jsonBuilder()
|
|
|
|
.startObject()
|
|
|
|
.field("user", "kimchy")
|
|
|
|
.field("postDate", new Date())
|
2014-01-06 15:58:46 -05:00
|
|
|
.field("message", "trying out Elasticsearch")
|
2013-08-28 19:24:34 -04:00
|
|
|
.endObject()
|
|
|
|
)
|
|
|
|
);
|
|
|
|
|
2018-10-22 14:54:04 -04:00
|
|
|
bulkRequest.add(client.prepareIndex("twitter", "_doc", "2")
|
2013-08-28 19:24:34 -04:00
|
|
|
.setSource(jsonBuilder()
|
|
|
|
.startObject()
|
|
|
|
.field("user", "kimchy")
|
|
|
|
.field("postDate", new Date())
|
|
|
|
.field("message", "another post")
|
|
|
|
.endObject()
|
|
|
|
)
|
|
|
|
);
|
|
|
|
|
2015-06-24 17:27:19 -04:00
|
|
|
BulkResponse bulkResponse = bulkRequest.get();
|
2013-08-28 19:24:34 -04:00
|
|
|
if (bulkResponse.hasFailures()) {
|
|
|
|
// process failures by iterating through each bulk response item
|
|
|
|
}
|
|
|
|
--------------------------------------------------
|
2014-09-22 09:05:10 -04:00
|
|
|
|
2015-06-24 17:27:19 -04:00
|
|
|
[[java-docs-bulk-processor]]
|
|
|
|
=== Using Bulk Processor
|
2014-09-22 09:05:10 -04:00
|
|
|
|
|
|
|
The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
|
|
|
|
of requests, or after a given period.
|
|
|
|
|
|
|
|
To use it, first create a `BulkProcessor` instance:
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
2015-12-18 01:52:33 -05:00
|
|
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
2014-09-22 09:05:10 -04:00
|
|
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
2015-06-18 10:38:12 -04:00
|
|
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
|
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
2015-07-28 16:19:45 -04:00
|
|
|
import org.elasticsearch.common.unit.TimeValue;
|
2014-09-22 09:05:10 -04:00
|
|
|
|
|
|
|
BulkProcessor bulkProcessor = BulkProcessor.builder(
|
|
|
|
client, <1>
|
|
|
|
new BulkProcessor.Listener() {
|
|
|
|
@Override
|
|
|
|
public void beforeBulk(long executionId,
|
|
|
|
BulkRequest request) { ... } <2>
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void afterBulk(long executionId,
|
|
|
|
BulkRequest request,
|
|
|
|
BulkResponse response) { ... } <3>
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void afterBulk(long executionId,
|
|
|
|
BulkRequest request,
|
|
|
|
Throwable failure) { ... } <4>
|
|
|
|
})
|
|
|
|
.setBulkActions(10000) <5>
|
2016-11-18 10:52:46 -05:00
|
|
|
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) <6>
|
2014-09-22 09:05:10 -04:00
|
|
|
.setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
|
|
|
|
.setConcurrentRequests(1) <8>
|
2015-12-18 01:52:33 -05:00
|
|
|
.setBackoffPolicy(
|
|
|
|
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
|
2014-09-22 09:05:10 -04:00
|
|
|
.build();
|
|
|
|
--------------------------------------------------
|
2017-11-29 03:44:25 -05:00
|
|
|
<1> Add your Elasticsearch client
|
2014-09-22 09:05:10 -04:00
|
|
|
<2> This method is called just before bulk is executed. You can for example see the numberOfActions with
|
|
|
|
`request.numberOfActions()`
|
|
|
|
<3> This method is called after bulk execution. You can for example check if there was some failing requests
|
|
|
|
with `response.hasFailures()`
|
|
|
|
<4> This method is called when the bulk failed and raised a `Throwable`
|
|
|
|
<5> We want to execute the bulk every 10 000 requests
|
2016-11-18 10:52:46 -05:00
|
|
|
<6> We want to flush the bulk every 5mb
|
2014-09-22 09:05:10 -04:00
|
|
|
<7> We want to flush the bulk every 5 seconds whatever the number of requests
|
Explain how to use bulk processor in a test context
When using a bulk processor in test, you might write something like:
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(10000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.build();
for (int i = 0; i < 10000; i++) {
bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
.source(jsonBuilder().startObject().field("foo", "bar").endObject()
));
}
bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```
The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.
We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(5000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();
```
Closes #22158.
2016-12-16 10:45:56 -05:00
|
|
|
<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
|
2014-09-22 09:05:10 -04:00
|
|
|
executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
|
2015-12-18 01:52:33 -05:00
|
|
|
<9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
|
|
|
|
times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
|
|
|
|
which indicates that there were too little compute resources available for processing the request. To disable backoff,
|
|
|
|
pass `BackoffPolicy.noBackoff()`.
|
2014-09-22 09:05:10 -04:00
|
|
|
|
Explain how to use bulk processor in a test context
When using a bulk processor in test, you might write something like:
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(10000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.build();
for (int i = 0; i < 10000; i++) {
bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
.source(jsonBuilder().startObject().field("foo", "bar").endObject()
));
}
bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```
The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.
We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(5000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();
```
Closes #22158.
2016-12-16 10:45:56 -05:00
|
|
|
By default, `BulkProcessor`:
|
|
|
|
|
|
|
|
* sets bulkActions to `1000`
|
|
|
|
* sets bulkSize to `5mb`
|
|
|
|
* does not set flushInterval
|
|
|
|
* sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
|
|
|
|
* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
|
|
|
|
|
|
|
|
[[java-docs-bulk-processor-requests]]
|
|
|
|
==== Add requests
|
|
|
|
|
2014-09-22 09:05:10 -04:00
|
|
|
Then you can simply add your requests to the `BulkProcessor`:
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
2018-10-22 14:54:04 -04:00
|
|
|
bulkProcessor.add(new IndexRequest("twitter", "_doc", "1").source(/* your doc here */));
|
|
|
|
bulkProcessor.add(new DeleteRequest("twitter", "_doc", "2"));
|
2014-09-22 09:05:10 -04:00
|
|
|
--------------------------------------------------
|
|
|
|
|
Explain how to use bulk processor in a test context
When using a bulk processor in test, you might write something like:
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(10000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.build();
for (int i = 0; i < 10000; i++) {
bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
.source(jsonBuilder().startObject().field("foo", "bar").endObject()
));
}
bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```
The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.
We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(5000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();
```
Closes #22158.
2016-12-16 10:45:56 -05:00
|
|
|
[[java-docs-bulk-processor-close]]
|
|
|
|
==== Closing the Bulk Processor
|
2014-09-22 09:05:10 -04:00
|
|
|
|
2015-04-29 10:53:16 -04:00
|
|
|
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
|
|
|
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
|
|
|
|
--------------------------------------------------
|
|
|
|
|
|
|
|
or
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
|
|
|
bulkProcessor.close();
|
|
|
|
--------------------------------------------------
|
|
|
|
|
2019-09-19 08:20:11 -04:00
|
|
|
Both methods flush any remaining documents and disable all other scheduled flushes, if they were scheduled by setting
|
|
|
|
`flushInterval`. If concurrent requests were enabled, the `awaitClose` method waits for up to the specified timeout for
|
|
|
|
all bulk requests to complete then returns `true`; if the specified waiting time elapses before all bulk requests complete,
|
2016-01-18 05:59:19 -05:00
|
|
|
`false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.
|
2015-04-29 10:53:16 -04:00
|
|
|
|
Explain how to use bulk processor in a test context
When using a bulk processor in test, you might write something like:
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(10000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.build();
for (int i = 0; i < 10000; i++) {
bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
.source(jsonBuilder().startObject().field("foo", "bar").endObject()
));
}
bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```
The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.
We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(5000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();
```
Closes #22158.
2016-12-16 10:45:56 -05:00
|
|
|
[[java-docs-bulk-processor-tests]]
|
|
|
|
==== Using Bulk Processor in tests
|
|
|
|
|
2017-11-29 03:44:25 -05:00
|
|
|
If you are running tests with Elasticsearch and are using the `BulkProcessor` to populate your dataset
|
Explain how to use bulk processor in a test context
When using a bulk processor in test, you might write something like:
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(10000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.build();
for (int i = 0; i < 10000; i++) {
bulkProcessor.add(new IndexRequest("foo", "bar", "doc_" + i)
.source(jsonBuilder().startObject().field("foo", "bar").endObject()
));
}
bulkProcessor.flush();
client.admin().indices().prepareRefresh("foo").get();
SearchResponse response = client.prepareSearch("foo").get();
// response does not contain any hit
```
The problem is that by default bulkProcessor defines the number of concurrent requests to 1 which is using behind the scene an Async BulkRequestHandler.
When you call `flush()` in a test, you expect it to flush all the content of the bulk so you can search for your docs.
But because of the async handling, there is a great chance that none of the documents has been indexed yet when you call the `refresh` method.
We should advice in our Java guide to explicitly set concurrent requests to `0` so users will use behind the scene the Sync BulkRequestHandler.
```java
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override public void beforeBulk(long executionId, BulkRequest request) {}
@Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}
@Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
})
.setBulkActions(5000)
.setFlushInterval(TimeValue.timeValueSeconds(10))
.setConcurrentRequests(0)
.build();
```
Closes #22158.
2016-12-16 10:45:56 -05:00
|
|
|
you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed
|
|
|
|
in a synchronous manner:
|
|
|
|
|
|
|
|
[source,java]
|
|
|
|
--------------------------------------------------
|
|
|
|
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
|
|
|
|
.setBulkActions(10000)
|
|
|
|
.setConcurrentRequests(0)
|
|
|
|
.build();
|
|
|
|
|
|
|
|
// Add your requests
|
|
|
|
bulkProcessor.add(/* Your requests */);
|
|
|
|
|
|
|
|
// Flush any remaining requests
|
|
|
|
bulkProcessor.flush();
|
|
|
|
|
|
|
|
// Or close the bulkProcessor if you don't need it anymore
|
|
|
|
bulkProcessor.close();
|
|
|
|
|
|
|
|
// Refresh your indices
|
|
|
|
client.admin().indices().prepareRefresh().get();
|
|
|
|
|
|
|
|
// Now you can start searching!
|
|
|
|
client.prepareSearch().get();
|
|
|
|
--------------------------------------------------
|
|
|
|
|
2018-10-30 04:08:12 -04:00
|
|
|
|
|
|
|
[[java-docs-bulk-global-parameters]]
|
|
|
|
==== Global Parameters
|
|
|
|
|
|
|
|
Global parameters can be specified on the BulkRequest as well as BulkProcessor, similar to the REST API. These global
|
|
|
|
parameters serve as defaults and can be overridden by local parameters specified on each sub request. Some parameters
|
|
|
|
have to be set before any sub request is added - index, type - and you have to specify them during BulkRequest or
|
|
|
|
BulkProcessor creation. Some are optional - pipeline, routing - and can be specified at any point before the bulk is sent.
|
|
|
|
|
|
|
|
["source","java",subs="attributes,callouts,macros"]
|
|
|
|
--------------------------------------------------
|
2018-10-30 09:14:07 -04:00
|
|
|
include-tagged::{hlrc-tests}/BulkProcessorIT.java[bulk-processor-mix-parameters]
|
2018-10-30 04:08:12 -04:00
|
|
|
--------------------------------------------------
|
|
|
|
<1> global parameters from the BulkRequest will be applied on a sub request
|
|
|
|
<2> local pipeline parameter on a sub request will override global parameters from BulkRequest
|
|
|
|
|
|
|
|
|
|
|
|
["source","java",subs="attributes,callouts,macros"]
|
|
|
|
--------------------------------------------------
|
2018-10-30 09:14:07 -04:00
|
|
|
include-tagged::{hlrc-tests}/BulkRequestWithGlobalParametersIT.java[bulk-request-mix-pipeline]
|
2018-10-30 04:08:12 -04:00
|
|
|
--------------------------------------------------
|
|
|
|
<1> local pipeline parameter on a sub request will override global pipeline from the BulkRequest
|
|
|
|
<2> global parameter from the BulkRequest will be applied on a sub request
|