From ac94253dce99f12742085e02ed891dc397ba2272 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 30 May 2017 16:22:17 -0400 Subject: [PATCH] Clarify acquiring index shard permit In previous work, we refactored the delay mechanism in index shard operation permits to allow for async delaying of acquisition. This refactoring made explicit when permit acquisition is disabled whereas previously we were relying on an implicit condition, namely that all permits were acquired by the thread trying to delay acquisition. When using the implicit mechanism, we tried to acquire a permit and if this failed, we returned a null releasable as an indication that our operation should be queued. Yet, now we know when we are delayed and we should not even try to acquire a permit. If we try to acquire a permit and one is not available, we know that we are not delayed, and so acquisition should be successful. If it is not successful, something is deeply wrong. This commit takes advantage of this refactoring to simplify the internal implementation. Relates #24971 --- .../shard/IndexShardOperationPermits.java | 13 +++++------ .../IndexShardOperationPermitsTests.java | 22 +++++++++++++++++++ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index 83a372dd453..de539026e7a 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; @@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable { private final Logger logger; private final ThreadPool threadPool; - private static final int TOTAL_PERMITS = Integer.MAX_VALUE; + static final int TOTAL_PERMITS = Integer.MAX_VALUE; final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; @@ -225,8 +224,7 @@ final class IndexShardOperationPermits implements Closeable { } return; } else { - releasable = tryAcquire(); - assert releasable != null; + releasable = acquire(); } } } catch (final InterruptedException e) { @@ -237,8 +235,7 @@ final class IndexShardOperationPermits implements Closeable { onAcquired.onResponse(releasable); } - @Nullable - private Releasable tryAcquire() throws InterruptedException { + private Releasable acquire() throws InterruptedException { assert Thread.holdsLock(this); if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting final AtomicBoolean closed = new AtomicBoolean(); @@ -247,8 +244,10 @@ final class IndexShardOperationPermits implements Closeable { semaphore.release(1); } }; + } else { + // this should never happen, if it does something is deeply wrong + throw new IllegalStateException("failed to obtain permit but operations are not delayed"); } - return null; } /** diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index ec22f9d862b..41dc8f520cc 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -533,6 +533,28 @@ public class IndexShardOperationPermitsTests extends ESTestCase { thread.join(); } + public void testNoPermitsRemaining() throws InterruptedException { + permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS); + final IllegalStateException e = expectThrows( + IllegalStateException.class, + () -> this.permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + assert false; + } + + @Override + public void onFailure(Exception e) { + assert false; + } + }, + ThreadPool.Names.GENERIC, + false)); + assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed"))); + permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS); + } + /** * Returns an operation that acquires a permit and synchronizes in the following manner: *