OpenSearch/docs/java-api/docs/bulk.asciidoc

168 lines
6.7 KiB
Plaintext

[[java-docs-bulk]]
=== Bulk API
The bulk API allows one to index and delete several documents in a
single request. Here is a sample usage:
[source,java]
--------------------------------------------------
import static org.elasticsearch.common.xcontent.XContentFactory.*;
BulkRequestBuilder bulkRequest = client.prepareBulk();
// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
--------------------------------------------------
[[java-docs-bulk-processor]]
=== Using Bulk Processor
The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size
of requests, or after a given period.
To use it, first create a `BulkProcessor` instance:
[source,java]
--------------------------------------------------
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, <1>
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } <2>
@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } <3>
@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } <4>
})
.setBulkActions(10000) <5>
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) <6>
.setFlushInterval(TimeValue.timeValueSeconds(5)) <7>
.setConcurrentRequests(1) <8>
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9>
.build();
--------------------------------------------------
<1> Add your elasticsearch client
<2> This method is called just before bulk is executed. You can for example see the numberOfActions with
`request.numberOfActions()`
<3> This method is called after bulk execution. You can for example check if there was some failing requests
with `response.hasFailures()`
<4> This method is called when the bulk failed and raised a `Throwable`
<5> We want to execute the bulk every 10 000 requests
<6> We want to flush the bulk every 5mb
<7> We want to flush the bulk every 5 seconds whatever the number of requests
<8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be
executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.
<9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three
times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException`
which indicates that there were too little compute resources available for processing the request. To disable backoff,
pass `BackoffPolicy.noBackoff()`.
By default, `BulkProcessor`:
* sets bulkActions to `1000`
* sets bulkSize to `5mb`
* does not set flushInterval
* sets concurrentRequests to 1, which means an asynchronous execution of the flush operation.
* sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds.
[[java-docs-bulk-processor-requests]]
==== Add requests
Then you can simply add your requests to the `BulkProcessor`:
[source,java]
--------------------------------------------------
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
--------------------------------------------------
[[java-docs-bulk-processor-close]]
==== Closing the Bulk Processor
When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods:
[source,java]
--------------------------------------------------
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
--------------------------------------------------
or
[source,java]
--------------------------------------------------
bulkProcessor.close();
--------------------------------------------------
Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting
`flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for
all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete,
`false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately.
[[java-docs-bulk-processor-tests]]
==== Using Bulk Processor in tests
If you are running tests with elasticsearch and are using the `BulkProcessor` to populate your dataset
you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed
in a synchronous manner:
[source,java]
--------------------------------------------------
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();
// Add your requests
bulkProcessor.add(/* Your requests */);
// Flush any remaining requests
bulkProcessor.flush();
// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();
// Refresh your indices
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
--------------------------------------------------