BulkProcessor process every n+1 docs instead of n

When you set a BulkProcessor with a bulk actions size of 100, it executes the bulk after 101 documents.

```java
BulkProcessor.builder(client(), listener).setBulkActions(100).setConcurrentRequests(1).setName("foo").build();
```

Same for size. If you set the bulk size to 1024 bytes, it will actually execute the bulk after 1025 bytes.

This patch fix it.

Closes #4265.
This commit is contained in:
EvanYellow 2013-11-27 18:32:07 +08:00 committed by David Pilato
parent fb934aff57
commit 43b5d91de2
2 changed files with 49 additions and 2 deletions

View File

@ -313,10 +313,10 @@ public class BulkProcessor {
} }
private boolean isOverTheLimit() { private boolean isOverTheLimit() {
if (bulkActions != -1 && bulkRequest.numberOfActions() > bulkActions) { if (bulkActions != -1 && bulkRequest.numberOfActions() >= bulkActions) {
return true; return true;
} }
if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() > bulkSize) { if (bulkSize != -1 && bulkRequest.estimatedSizeInBytes() >= bulkSize) {
return true; return true;
} }
return false; return false;

View File

@ -20,10 +20,14 @@
package org.elasticsearch.document; package org.elasticsearch.document;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateRequestBuilder;
@ -36,7 +40,11 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test; import org.junit.Test;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -561,4 +569,43 @@ public class BulkTests extends ElasticsearchIntegrationTest {
assertExists(get("test", "type", "48")); assertExists(get("test", "type", "48"));
} }
@Test
public void testThatBulkProcessorCountIsCorrect() throws InterruptedException {
final BlockingQueue<BulkResponse> responseQueue = new SynchronousQueue();
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
responseQueue.add(response);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {}
};
BulkProcessor processor = null;
try {
processor = BulkProcessor.builder(client(), listener).setBulkActions(5).setConcurrentRequests(1).setName("foo").build();
Map<String, Object> data = Maps.newHashMap();
data.put("foo", "bar");
processor.add(new IndexRequest("test", "test", "1").source(data));
processor.add(new IndexRequest("test", "test", "2").source(data));
processor.add(new IndexRequest("test", "test", "3").source(data));
processor.add(new IndexRequest("test", "test", "4").source(data));
processor.add(new IndexRequest("test", "test", "5").source(data));
BulkResponse response = responseQueue.poll(5, TimeUnit.SECONDS);
assertThat("Could not get a bulk response in 5 seconds", response, is(notNullValue()));
assertThat(response.getItems().length, is(5));
} finally {
if (processor != null) {
processor.close();
}
}
}
} }