[RCI] Add IndexShardOperationPermits.asyncBlockOperations(ActionListener<Releasable>) (#34902)
The current implementation of asyncBlockOperations() can be used to execute some code once all indexing operations permits have been acquired, then releases all permits immediately after the code execution. This immediate release is not suitable for treatments that need to keep all permits over multiple execution steps. This commit adds a new asyncBlockOperations() that exposes a Releasable, making it possible to acquire all permits and only release them all when needed by closing the Releasable. The existing blockOperations() method has been modified to delegate permit acquisition/releasing to this new method. Relates to #33888
This commit is contained in:
parent
a90ef6bd6e
commit
1703a61fec
|
@ -2286,23 +2286,32 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
assert newPrimaryTerm > pendingPrimaryTerm;
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
final CountDownLatch termUpdated = new CountDownLatch(1);
|
||||
indexShardOperationPermits.asyncBlockOperations(30, TimeUnit.MINUTES, () -> {
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
termUpdated.await();
|
||||
// indexShardOperationPermits doesn't guarantee that async submissions are executed
|
||||
// in the order submitted. We need to guard against another term bump
|
||||
if (operationPrimaryTerm < newPrimaryTerm) {
|
||||
operationPrimaryTerm = newPrimaryTerm;
|
||||
onBlocked.run();
|
||||
}
|
||||
},
|
||||
e -> {
|
||||
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
try {
|
||||
failShard("exception during primary term transition", e);
|
||||
} catch (AlreadyClosedException ace) {
|
||||
// ignore, shard is already closed
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
try (Releasable ignored = releasable) {
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
termUpdated.await();
|
||||
// indexShardOperationPermits doesn't guarantee that async submissions are executed
|
||||
// in the order submitted. We need to guard against another term bump
|
||||
if (operationPrimaryTerm < newPrimaryTerm) {
|
||||
operationPrimaryTerm = newPrimaryTerm;
|
||||
onBlocked.run();
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
}, 30, TimeUnit.MINUTES);
|
||||
pendingPrimaryTerm = newPrimaryTerm;
|
||||
termUpdated.countDown();
|
||||
}
|
||||
|
|
|
@ -41,7 +41,6 @@ import java.util.concurrent.Semaphore;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -104,42 +103,54 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
final TimeUnit timeUnit,
|
||||
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
|
||||
delayOperations();
|
||||
try {
|
||||
doBlockOperations(timeout, timeUnit, onBlocked);
|
||||
try (Releasable ignored = acquireAll(timeout, timeUnit)) {
|
||||
onBlocked.run();
|
||||
} finally {
|
||||
releaseDelayedOperations();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked}
|
||||
* under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After
|
||||
* operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking
|
||||
* operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked.
|
||||
* Immediately delays operations and on another thread waits for in-flight operations to finish and then acquires all permits. When all
|
||||
* permits are acquired, the provided {@link ActionListener} is called under the guarantee that no new operations are started. Delayed
|
||||
* operations are run once the {@link Releasable} is released or if a failure occurs while acquiring all permits; in this case the
|
||||
* {@code onFailure} handler will be invoked after delayed operations are released.
|
||||
*
|
||||
* @param timeout the maximum time to wait for the in-flight operations block
|
||||
* @param timeUnit the time unit of the {@code timeout} argument
|
||||
* @param onBlocked the action to run once the block has been acquired
|
||||
* @param onFailure the action to run if a failure occurs while blocking operations
|
||||
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
|
||||
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
|
||||
* @param timeout the maximum time to wait for the in-flight operations block
|
||||
* @param timeUnit the time unit of the {@code timeout} argument
|
||||
*/
|
||||
<E extends Exception> void asyncBlockOperations(
|
||||
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
|
||||
public void asyncBlockOperations(final ActionListener<Releasable> onAcquired, final long timeout, final TimeUnit timeUnit) {
|
||||
delayOperations();
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
|
||||
|
||||
final AtomicBoolean released = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
onFailure.accept(e);
|
||||
try {
|
||||
releaseDelayedOperationsIfNeeded(); // resume delayed operations as soon as possible
|
||||
} finally {
|
||||
onAcquired.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
doBlockOperations(timeout, timeUnit, onBlocked);
|
||||
final Releasable releasable = acquireAll(timeout, timeUnit);
|
||||
onAcquired.onResponse(() -> {
|
||||
try {
|
||||
releasable.close();
|
||||
} finally {
|
||||
releaseDelayedOperationsIfNeeded();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
releaseDelayedOperations();
|
||||
private void releaseDelayedOperationsIfNeeded() {
|
||||
if (released.compareAndSet(false, true)) {
|
||||
releaseDelayedOperations();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -154,10 +165,7 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private <E extends Exception> void doBlockOperations(
|
||||
final long timeout,
|
||||
final TimeUnit timeUnit,
|
||||
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
|
||||
private Releasable acquireAll(final long timeout, final TimeUnit timeUnit) throws InterruptedException, TimeoutException {
|
||||
if (Assertions.ENABLED) {
|
||||
// since delayed is not volatile, we have to synchronize even here for visibility
|
||||
synchronized (this) {
|
||||
|
@ -165,12 +173,13 @@ final class IndexShardOperationPermits implements Closeable {
|
|||
}
|
||||
}
|
||||
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
|
||||
assert semaphore.availablePermits() == 0;
|
||||
try {
|
||||
onBlocked.run();
|
||||
} finally {
|
||||
semaphore.release(TOTAL_PERMITS);
|
||||
}
|
||||
final AtomicBoolean closed = new AtomicBoolean();
|
||||
return () -> {
|
||||
if (closed.compareAndSet(false, true)) {
|
||||
assert semaphore.availablePermits() == 0;
|
||||
semaphore.release(TOTAL_PERMITS);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
throw new TimeoutException("timeout while blocking operations");
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.index.shard;
|
|||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
|
@ -199,8 +200,9 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
permits.close();
|
||||
expectThrows(IndexShardClosedException.class, () -> permits.blockOperations(randomInt(10), TimeUnit.MINUTES,
|
||||
() -> { throw new IllegalArgumentException("fake error"); }));
|
||||
expectThrows(IndexShardClosedException.class, () -> permits.asyncBlockOperations(randomInt(10), TimeUnit.MINUTES,
|
||||
() -> { throw new IllegalArgumentException("fake error"); }, e -> { throw new AssertionError(e); }));
|
||||
expectThrows(IndexShardClosedException.class,
|
||||
() -> permits.asyncBlockOperations(wrap(() -> { throw new IllegalArgumentException("fake error");}),
|
||||
randomInt(10), TimeUnit.MINUTES));
|
||||
}
|
||||
|
||||
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
|
||||
|
@ -220,17 +222,11 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
try (Releasable ignored = blockAndWait()) {
|
||||
permits.acquire(future, ThreadPool.Names.GENERIC, true, "");
|
||||
|
||||
permits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
blocked.set(true);
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
},
|
||||
e -> {
|
||||
throw new RuntimeException(e);
|
||||
});
|
||||
permits.asyncBlockOperations(wrap(() -> {
|
||||
blocked.set(true);
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
}), 30, TimeUnit.MINUTES);
|
||||
assertFalse(blocked.get());
|
||||
assertFalse(future.isDone());
|
||||
}
|
||||
|
@ -292,7 +288,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
future2.get(1, TimeUnit.HOURS).close();
|
||||
}
|
||||
|
||||
protected Releasable blockAndWait() throws InterruptedException {
|
||||
private Releasable blockAndWait() throws InterruptedException {
|
||||
CountDownLatch blockAcquired = new CountDownLatch(1);
|
||||
CountDownLatch releaseBlock = new CountDownLatch(1);
|
||||
CountDownLatch blockReleased = new CountDownLatch(1);
|
||||
|
@ -334,17 +330,11 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
final CountDownLatch blockAcquired = new CountDownLatch(1);
|
||||
final CountDownLatch releaseBlock = new CountDownLatch(1);
|
||||
final AtomicBoolean blocked = new AtomicBoolean();
|
||||
permits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
blocked.set(true);
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
},
|
||||
e -> {
|
||||
throw new RuntimeException(e);
|
||||
});
|
||||
permits.asyncBlockOperations(wrap(() -> {
|
||||
blocked.set(true);
|
||||
blockAcquired.countDown();
|
||||
releaseBlock.await();
|
||||
}), 30, TimeUnit.MINUTES);
|
||||
blockAcquired.await();
|
||||
assertTrue(blocked.get());
|
||||
|
||||
|
@ -392,16 +382,10 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
// now we will delay operations while the first operation is still executing (because it is latched)
|
||||
final CountDownLatch blockedLatch = new CountDownLatch(1);
|
||||
final AtomicBoolean onBlocked = new AtomicBoolean();
|
||||
permits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
onBlocked.set(true);
|
||||
blockedLatch.countDown();
|
||||
}, e -> {
|
||||
throw new RuntimeException(e);
|
||||
});
|
||||
|
||||
permits.asyncBlockOperations(wrap(() -> {
|
||||
onBlocked.set(true);
|
||||
blockedLatch.countDown();
|
||||
}), 30, TimeUnit.MINUTES);
|
||||
assertFalse(onBlocked.get());
|
||||
|
||||
// if we submit another operation, it should be delayed
|
||||
|
@ -486,15 +470,10 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
permits.asyncBlockOperations(
|
||||
30,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
values.add(operations);
|
||||
operationLatch.countDown();
|
||||
}, e -> {
|
||||
throw new RuntimeException(e);
|
||||
});
|
||||
permits.asyncBlockOperations(wrap(() -> {
|
||||
values.add(operations);
|
||||
operationLatch.countDown();
|
||||
}), 30, TimeUnit.MINUTES);
|
||||
});
|
||||
blockingThread.start();
|
||||
|
||||
|
@ -559,16 +538,20 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
public void testAsyncBlockOperationsOnFailure() throws InterruptedException {
|
||||
final AtomicReference<Exception> reference = new AtomicReference<>();
|
||||
final CountDownLatch onFailureLatch = new CountDownLatch(1);
|
||||
permits.asyncBlockOperations(
|
||||
10,
|
||||
TimeUnit.MINUTES,
|
||||
() -> {
|
||||
permits.asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
try (Releasable ignored = releasable) {
|
||||
throw new RuntimeException("simulated");
|
||||
},
|
||||
e -> {
|
||||
reference.set(e);
|
||||
onFailureLatch.countDown();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
reference.set(e);
|
||||
onFailureLatch.countDown();
|
||||
}
|
||||
}, 10, TimeUnit.MINUTES);
|
||||
onFailureLatch.await();
|
||||
assertThat(reference.get(), instanceOf(RuntimeException.class));
|
||||
assertThat(reference.get(), hasToString(containsString("simulated")));
|
||||
|
@ -596,14 +579,18 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
{
|
||||
final AtomicReference<Exception> reference = new AtomicReference<>();
|
||||
final CountDownLatch onFailureLatch = new CountDownLatch(1);
|
||||
permits.asyncBlockOperations(
|
||||
1,
|
||||
TimeUnit.MILLISECONDS,
|
||||
() -> {},
|
||||
e -> {
|
||||
reference.set(e);
|
||||
onFailureLatch.countDown();
|
||||
});
|
||||
permits.asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
releasable.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
reference.set(e);
|
||||
onFailureLatch.countDown();
|
||||
}
|
||||
}, 1, TimeUnit.MILLISECONDS);
|
||||
onFailureLatch.await();
|
||||
assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
|
||||
}
|
||||
|
@ -716,4 +703,22 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
|
|||
assertThat(permits.getActiveOperationsCount(), equalTo(0));
|
||||
assertThat(permits.getActiveOperations(), emptyIterable());
|
||||
}
|
||||
|
||||
private static ActionListener<Releasable> wrap(final CheckedRunnable<Exception> onResponse) {
|
||||
return new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
try (Releasable ignored = releasable) {
|
||||
onResponse.run();
|
||||
} catch (final Exception e) {
|
||||
onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue