From 2bec59ec3e221e4154671370421f77a5a406cbd5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 17 Jul 2014 16:24:40 +0200 Subject: [PATCH] [Bulk] Release semaphore again after close The bulk processor tries to acquire all leases for the semaphore to wait for all pending requests. Yet, we should release them afterwards again to ensure we don't ever deadlock if there is a bug in the processor. This commit also adds a testcase for this method --- .../action/bulk/BulkProcessor.java | 16 +++++++--- .../action/bulk/BulkProcessorTests.java | 32 +++++++++++++++++++ 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 69b57aedb02..b7cd094dfef 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -155,7 +155,7 @@ public class BulkProcessor implements Closeable { private final int concurrentRequests; private final int bulkActions; - private final int bulkSize; + private final long bulkSize; private final TimeValue flushInterval; private final Semaphore semaphore; @@ -174,7 +174,7 @@ public class BulkProcessor implements Closeable { this.name = name; this.concurrentRequests = concurrentRequests; this.bulkActions = bulkActions; - this.bulkSize = bulkSize.bytesAsInt(); + this.bulkSize = bulkSize.bytes(); this.semaphore = new Semaphore(concurrentRequests); this.bulkRequest = new BulkRequest(); @@ -230,7 +230,11 @@ public class BulkProcessor implements Closeable { if (this.concurrentRequests < 1) { return true; } - return semaphore.tryAcquire(this.concurrentRequests, timeout, unit); + if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) { + semaphore.release(this.concurrentRequests); + return true; + } + return false; } /** @@ -260,7 +264,11 @@ public class BulkProcessor implements Closeable { return this; } - public void ensureOpen() { + boolean isOpen() { + return closed == false; + } + + protected void ensureOpen() { if (closed) { throw new ElasticsearchIllegalStateException("bulk process already closed"); } diff --git a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java index ef5ac5171e9..101d41bd878 100644 --- a/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java +++ b/src/test/java/org/elasticsearch/action/bulk/BulkProcessorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import com.carrotsearch.ant.tasks.junit4.dependencies.com.carrotsearch.randomizedtesting.generators.RandomPicks; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequestBuilder; import org.elasticsearch.action.get.MultiGetResponse; @@ -190,6 +191,37 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest { transportClient.close(); } + @Test + public void testBulkProcessorWaitOnClose() throws Exception { + BulkProcessorTestListener listener = new BulkProcessorTestListener(); + + int numDocs = randomIntBetween(10, 100); + BulkProcessor processor = BulkProcessor.builder(client(), listener).setName("foo") + //let's make sure that the bulk action limit trips, one single execution will index all the documents + .setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs) + .setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10), + (ByteSizeUnit)RandomPicks.randomFrom(getRandom(), ByteSizeUnit.values()))) + .build(); + + MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs); + assertThat(processor.isOpen(), is(true)); + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + if (randomBoolean()) { // check if we can call it multiple times + if (randomBoolean()) { + assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true)); + } else { + processor.close(); + } + } + assertThat(processor.isOpen(), is(false)); + + assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1)); + assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1)); + assertThat(listener.bulkFailures.size(), equalTo(0)); + assertResponseItems(listener.bulkItems, numDocs); + assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs); + } + @Test public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception { createIndex("test-ro");