diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 83a372dd453..de539026e7a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable { private final Logger logger; private final ThreadPool threadPool; - private static final int TOTAL_PERMITS = Integer.MAX_VALUE; + static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; @@ -225,8 +224,7 @@ final class IndexShardOperationPermits implements Closeable { } return; } else { - releasable = tryAcquire(); - assert releasable != null; + releasable = acquire(); } } } catch (final InterruptedException e) { @@ -237,8 +235,7 @@ final class IndexShardOperationPermits implements Closeable { onAcquired.onResponse(releasable); } - @Nullable - private Releasable tryAcquire() throws InterruptedException { + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); @@ -247,8 +244,10 @@ final class IndexShardOperationPermits implements Closeable { semaphore.release(1); } }; + } else { + // this should never happen, if it does something is deeply wrong + throw new IllegalStateException("failed to obtain permit but operations are not delayed"); } - return null; } /** diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ec22f9d862b..41dc8f520cc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -533,6 +533,28 @@ public class IndexShardOperationPermitsTests extends ESTestCase { thread.join(); } + public void testNoPermitsRemaining() throws InterruptedException { + permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> this.permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + assert false; + } + + @Override + public void onFailure(Exception e) { + assert false; + } + }, + ThreadPool.Names.GENERIC, + false)); + assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); + permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); + } + /** * Returns an operation that acquires a permit and synchronizes in the following manner: *