diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 69b57aedb02..b7cd094dfef 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -155,7 +155,7 @@ public class BulkProcessor implements Closeable { private final int concurrentRequests; private final int bulkActions; - private final int bulkSize; + private final long bulkSize; private final TimeValue flushInterval; private final Semaphore semaphore; @@ -174,7 +174,7 @@ public class BulkProcessor implements Closeable { this.name = name; this.concurrentRequests = concurrentRequests; this.bulkActions = bulkActions; - this.bulkSize = bulkSize.bytesAsInt(); + this.bulkSize = bulkSize.bytes(); this.semaphore = new Semaphore(concurrentRequests); this.bulkRequest = new BulkRequest(); @@ -230,7 +230,11 @@ public class BulkProcessor implements Closeable { if (this.concurrentRequests < 1) { return true; } - return semaphore.tryAcquire(this.concurrentRequests, timeout, unit); + if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) { + semaphore.release(this.concurrentRequests); + return true; + } + return false; } /** @@ -260,7 +264,11 @@ public class BulkProcessor implements Closeable { return this; } - public void ensureOpen() { + boolean isOpen() { + return closed == false; + } + + protected void ensureOpen() { if (closed) { throw new ElasticsearchIllegalStateException("bulk process already closed"); } diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index ef5ac5171e9..101d41bd878 100644 --- a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import com.carrotsearch.ant.tasks.junit4.dependencies.com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.get.MultiGetResponse; @@ -190,6 +191,37 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest { transportClient.close(); } + @Test + public void testBulkProcessorWaitOnClose() throws Exception { + BulkProcessorTestListener listener = new BulkProcessorTestListener(); + + int numDocs = randomIntBetween(10, 100); + BulkProcessor processor = BulkProcessor.builder(client(), listener).setName("foo") + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), + (ByteSizeUnit)RandomPicks.randomFrom(getRandom(), ByteSizeUnit.values()))) + .build(); + + MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); + assertThat(processor.isOpen(), is(true)); + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + if (randomBoolean()) { // check if we can call it multiple times + if (randomBoolean()) { + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + } else { + processor.close(); + } + } + assertThat(processor.isOpen(), is(false)); + + assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1)); + assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); + } + @Test public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception { createIndex("test-ro");