From d541be462b1fd225c3a08b7d04d7f78d72747bbc Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 20 Nov 2015 18:03:48 -0500 Subject: [PATCH] Do not release unacquired semaphore This commit adds an acquired flag to BulkProcessor#execute that is set only after successful acquisition of a permit on the semaphore there. This flag is used to ensure that we do not release a permit on the semaphore when we did not obtain a permit on the semaphore. Closes #14908 --- .../java/org/elasticsearch/action/bulk/BulkProcessor.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index 42a9344eabc..2a7c185ad8a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -324,9 +324,11 @@ public class BulkProcessor implements Closeable { } } else { boolean success = false; + boolean acquired = false; try { listener.beforeBulk(executionId, bulkRequest); semaphore.acquire(); + acquired = true; client.bulk(bulkRequest, new ActionListener() { @Override public void onResponse(BulkResponse response) { @@ -353,7 +355,7 @@ public class BulkProcessor implements Closeable { } catch (Throwable t) { listener.afterBulk(executionId, bulkRequest, t); } finally { - if (!success) { // if we fail on client.bulk() release the semaphore + if (!success && acquired) { // if we fail on client.bulk() release the semaphore semaphore.release(); } }