Do not release unacquired semaphore
This commit adds an acquired flag to BulkProcessor#execute that is set only after successful acquisition of a permit on the semaphore there. This flag is used to ensure that we do not release a permit on the semaphore when we did not obtain a permit on the semaphore. Closes #14908
This commit is contained in:
parent
5a18f740ba
commit
d541be462b
|
@ -324,9 +324,11 @@ public class BulkProcessor implements Closeable {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
|
boolean acquired = false;
|
||||||
try {
|
try {
|
||||||
listener.beforeBulk(executionId, bulkRequest);
|
listener.beforeBulk(executionId, bulkRequest);
|
||||||
semaphore.acquire();
|
semaphore.acquire();
|
||||||
|
acquired = true;
|
||||||
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
|
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(BulkResponse response) {
|
public void onResponse(BulkResponse response) {
|
||||||
|
@ -353,7 +355,7 @@ public class BulkProcessor implements Closeable {
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
listener.afterBulk(executionId, bulkRequest, t);
|
listener.afterBulk(executionId, bulkRequest, t);
|
||||||
} finally {
|
} finally {
|
||||||
if (!success) { // if we fail on client.bulk() release the semaphore
|
if (!success && acquired) { // if we fail on client.bulk() release the semaphore
|
||||||
semaphore.release();
|
semaphore.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue