[Bulk] Release semaphore again after close

The bulk processor tries to acquire all leases for the semaphore to wait
for all pending requests. Yet, we should release them afterwards again to
ensure we don't ever deadlock if there is a bug in the processor.
This commit also adds a testcase for this method
This commit is contained in:
Simon Willnauer 2014-07-17 16:24:40 +02:00
parent 42b71a004a
commit 2bec59ec3e
2 changed files with 44 additions and 4 deletions

View File

@ -155,7 +155,7 @@ public class BulkProcessor implements Closeable {
private final int concurrentRequests;
private final int bulkActions;
private final int bulkSize;
private final long bulkSize;
private final TimeValue flushInterval;
private final Semaphore semaphore;
@ -174,7 +174,7 @@ public class BulkProcessor implements Closeable {
this.name = name;
this.concurrentRequests = concurrentRequests;
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.bytesAsInt();
this.bulkSize = bulkSize.bytes();
this.semaphore = new Semaphore(concurrentRequests);
this.bulkRequest = new BulkRequest();
@ -230,7 +230,11 @@ public class BulkProcessor implements Closeable {
if (this.concurrentRequests < 1) {
return true;
}
return semaphore.tryAcquire(this.concurrentRequests, timeout, unit);
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
return false;
}
/**
@ -260,7 +264,11 @@ public class BulkProcessor implements Closeable {
return this;
}
public void ensureOpen() {
boolean isOpen() {
return closed == false;
}
protected void ensureOpen() {
if (closed) {
throw new ElasticsearchIllegalStateException("bulk process already closed");
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.bulk;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
@ -190,6 +191,37 @@ public class BulkProcessorTests extends ElasticsearchIntegrationTest {
transportClient.close();
}
@Test
public void testBulkProcessorWaitOnClose() throws Exception {
BulkProcessorTestListener listener = new BulkProcessorTestListener();
int numDocs = randomIntBetween(10, 100);
BulkProcessor processor = BulkProcessor.builder(client(), listener).setName("foo")
//let's make sure that the bulk action limit trips, one single execution will index all the documents
.setConcurrentRequests(randomIntBetween(0, 1)).setBulkActions(numDocs)
.setFlushInterval(TimeValue.timeValueHours(24)).setBulkSize(new ByteSizeValue(randomIntBetween(1, 10),
(ByteSizeUnit)RandomPicks.randomFrom(getRandom(), ByteSizeUnit.values())))
.build();
MultiGetRequestBuilder multiGetRequestBuilder = indexDocs(client(), processor, numDocs);
assertThat(processor.isOpen(), is(true));
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
if (randomBoolean()) { // check if we can call it multiple times
if (randomBoolean()) {
assertThat(processor.awaitClose(1, TimeUnit.MINUTES), is(true));
} else {
processor.close();
}
}
assertThat(processor.isOpen(), is(false));
assertThat(listener.beforeCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.afterCounts.get(), greaterThanOrEqualTo(1));
assertThat(listener.bulkFailures.size(), equalTo(0));
assertResponseItems(listener.bulkItems, numDocs);
assertMultiGetResponse(multiGetRequestBuilder.get(), numDocs);
}
@Test
public void testBulkProcessorConcurrentRequestsReadOnlyIndex() throws Exception {
createIndex("test-ro");