Clarify acquiring index shard permit

In previous work, we refactored the delay mechanism in index shard
operation permits to allow for async delaying of acquisition. This
refactoring made explicit when permit acquisition is disabled whereas
previously we were relying on an implicit condition, namely that all
permits were acquired by the thread trying to delay acquisition. When
using the implicit mechanism, we tried to acquire a permit and if this
failed, we returned a null releasable as an indication that our
operation should be queued. Yet, now we know when we are delayed and we
should not even try to acquire a permit. If we try to acquire a permit
and one is not available, we know that we are not delayed, and so
acquisition should be successful. If it is not successful, something is
deeply wrong. This commit takes advantage of this refactoring to
simplify the internal implementation.

Relates #24971
This commit is contained in:
Jason Tedor 2017-05-30 16:22:17 -04:00 committed by GitHub
parent 9b6b4ffe8e
commit ac94253dce
2 changed files with 28 additions and 7 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
@ -53,7 +52,7 @@ final class IndexShardOperationPermits implements Closeable {
private final Logger logger;
private final ThreadPool threadPool;
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
static final int TOTAL_PERMITS = Integer.MAX_VALUE;
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
@ -225,8 +224,7 @@ final class IndexShardOperationPermits implements Closeable {
}
return;
} else {
releasable = tryAcquire();
assert releasable != null;
releasable = acquire();
}
}
} catch (final InterruptedException e) {
@ -237,8 +235,7 @@ final class IndexShardOperationPermits implements Closeable {
onAcquired.onResponse(releasable);
}
@Nullable
private Releasable tryAcquire() throws InterruptedException {
private Releasable acquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
@ -247,8 +244,10 @@ final class IndexShardOperationPermits implements Closeable {
semaphore.release(1);
}
};
} else {
// this should never happen, if it does something is deeply wrong
throw new IllegalStateException("failed to obtain permit but operations are not delayed");
}
return null;
}
/**

View File

@ -533,6 +533,28 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
thread.join();
}
public void testNoPermitsRemaining() throws InterruptedException {
permits.semaphore.tryAcquire(IndexShardOperationPermits.TOTAL_PERMITS, 1, TimeUnit.SECONDS);
final IllegalStateException e = expectThrows(
IllegalStateException.class,
() -> this.permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
assert false;
}
@Override
public void onFailure(Exception e) {
assert false;
}
},
ThreadPool.Names.GENERIC,
false));
assertThat(e, hasToString(containsString("failed to obtain permit but operations are not delayed")));
permits.semaphore.release(IndexShardOperationPermits.TOTAL_PERMITS);
}
/**
* Returns an operation that acquires a permit and synchronizes in the following manner:
* <ul>