[[java-docs-bulk]] === Bulk API The bulk API allows one to index and delete several documents in a single request. Here is a sample usage: [source,java] -------------------------------------------------- import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item } -------------------------------------------------- [[java-docs-bulk-processor]] === Using Bulk Processor The `BulkProcessor` class offers a simple interface to flush bulk operations automatically based on the number or size of requests, or after a given period. To use it, first create a `BulkProcessor` instance: [source,java] -------------------------------------------------- import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, <1> new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } <2> @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } <3> @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } <4> }) .setBulkActions(10000) <5> .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) <6> .setFlushInterval(TimeValue.timeValueSeconds(5)) <7> .setConcurrentRequests(1) <8> .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) <9> .build(); -------------------------------------------------- <1> Add your elasticsearch client <2> This method is called just before bulk is executed. You can for example see the numberOfActions with `request.numberOfActions()` <3> This method is called after bulk execution. You can for example check if there was some failing requests with `response.hasFailures()` <4> This method is called when the bulk failed and raised a `Throwable` <5> We want to execute the bulk every 10 000 requests <6> We want to flush the bulk every 5mb <7> We want to flush the bulk every 5 seconds whatever the number of requests <8> Set the number of concurrent requests. A value of 0 means that only a single request will be allowed to be executed. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. <9> Set a custom backoff policy which will initially wait for 100ms, increase exponentially and retries up to three times. A retry is attempted whenever one or more bulk item requests have failed with an `EsRejectedExecutionException` which indicates that there were too little compute resources available for processing the request. To disable backoff, pass `BackoffPolicy.noBackoff()`. By default, `BulkProcessor`: * sets bulkActions to `1000` * sets bulkSize to `5mb` * does not set flushInterval * sets concurrentRequests to 1, which means an asynchronous execution of the flush operation. * sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds. [[java-docs-bulk-processor-requests]] ==== Add requests Then you can simply add your requests to the `BulkProcessor`: [source,java] -------------------------------------------------- bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2")); -------------------------------------------------- [[java-docs-bulk-processor-close]] ==== Closing the Bulk Processor When all documents are loaded to the `BulkProcessor` it can be closed by using `awaitClose` or `close` methods: [source,java] -------------------------------------------------- bulkProcessor.awaitClose(10, TimeUnit.MINUTES); -------------------------------------------------- or [source,java] -------------------------------------------------- bulkProcessor.close(); -------------------------------------------------- Both methods flush any remaining documents and disable all other scheduled flushes if they were scheduled by setting `flushInterval`. If concurrent requests were enabled the `awaitClose` method waits for up to the specified timeout for all bulk requests to complete then returns `true`, if the specified waiting time elapses before all bulk requests complete, `false` is returned. The `close` method doesn't wait for any remaining bulk requests to complete and exits immediately. [[java-docs-bulk-processor-tests]] ==== Using Bulk Processor in tests If you are running tests with elasticsearch and are using the `BulkProcessor` to populate your dataset you should better set the number of concurrent requests to `0` so the flush operation of the bulk will be executed in a synchronous manner: [source,java] -------------------------------------------------- BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get(); --------------------------------------------------