Introduce clean transition on primary promotion

This commit introduces a clean transition from the old primary term to
the new primary term when a replica is promoted primary. To accomplish
this, we delay all operations before incrementing the primary term. The
delay is guaranteed to be in place before we increment the term, and
then all operations that are delayed are executed after the delay is
removed which asynchronously happens on another thread. This thread does
not progress until in-flight operations that were executing are
completed, and after these operations drain, the delayed operations
re-acquire permits and are executed.

Relates #24925
This commit is contained in:
Jason Tedor 2017-05-30 11:39:36 -04:00 committed by GitHub
parent 15fc71249c
commit ddbc4687f6
4 changed files with 601 additions and 64 deletions

View File

@ -131,6 +131,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -332,12 +333,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
/**
* notifies the shard of an increase in the primary term
* Notifies the shard of an increase in the primary term.
*
* @param newPrimaryTerm the new primary term
*/
public void updatePrimaryTerm(final long newTerm) {
public void updatePrimaryTerm(final long newPrimaryTerm) {
assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard";
synchronized (mutex) {
if (newTerm != primaryTerm) {
if (newPrimaryTerm != primaryTerm) {
// Note that due to cluster state batching an initializing primary shard term can failed and re-assigned
// in one state causing it's term to be incremented. Note that if both current shard state and new
// shard state are initializing, we could replace the current shard and reinitialize it. It is however
@ -354,10 +357,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
"a started primary shard should never update its term; "
+ "shard " + shardRouting + ", "
+ "current term [" + primaryTerm + "], "
+ "new term [" + newTerm + "]";
assert newTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]";
primaryTerm = newTerm;
+ "new term [" + newPrimaryTerm + "]";
assert newPrimaryTerm > primaryTerm :
"primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]";
/*
* Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we
* increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is
* incremented.
*/
final CountDownLatch latch = new CountDownLatch(1);
indexShardOperationPermits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
latch::await,
e -> failShard("exception during primary term transition", e));
primaryTerm = newPrimaryTerm;
latch.countDown();
}
}
}

View File

@ -20,12 +20,14 @@
package org.elasticsearch.index.shard;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext;
import org.elasticsearch.threadpool.ThreadPool;
@ -36,20 +38,35 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
/**
* Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition
* between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain
* and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has
* completed.
*/
final class IndexShardOperationPermits implements Closeable {
private final ShardId shardId;
private final Logger logger;
private final ThreadPool threadPool;
private static final int TOTAL_PERMITS = Integer.MAX_VALUE;
// fair semaphore to ensure that blockOperations() does not starve under thread contention
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true);
@Nullable private List<ActionListener<Releasable>> delayedOperations; // operations that are delayed
final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved
private final List<ActionListener<Releasable>> delayedOperations = new ArrayList<>(); // operations that are delayed
private volatile boolean closed;
private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this
IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) {
/**
* Construct operation permits for the specified shards.
*
* @param shardId the shard
* @param logger the logger for the shard
* @param threadPool the thread pool (used to execute delayed operations)
*/
IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) {
this.shardId = shardId;
this.logger = logger;
this.threadPool = threadPool;
@ -61,99 +78,170 @@ final class IndexShardOperationPermits implements Closeable {
}
/**
* Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues
* operations that are occurring in the meanwhile and runs them once onBlocked has executed.
* Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues
* operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed.
*
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param onBlocked the action to run once the block has been acquired
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @param <E> the type of checked exception thrown by {@code onBlocked}
* @throws InterruptedException if calling thread is interrupted
* @throws TimeoutException if timed out waiting for in-flight operations to finish
* @throws IndexShardClosedException if operation permit has been closed
*/
public <E extends Exception> void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable<E> onBlocked) throws
InterruptedException, TimeoutException, E {
<E extends Exception> void blockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (closed) {
throw new IndexShardClosedException(shardId);
}
delayOperations();
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
} else {
throw new TimeoutException("timed out during blockOperations");
}
doBlockOperations(timeout, timeUnit, onBlocked);
} finally {
final List<ActionListener<Releasable>> queuedActions;
releaseDelayedOperations();
}
}
/**
* Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked}
* under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After
* operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking
* operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked.
*
* @param timeout the maximum time to wait for the in-flight operations block
* @param timeUnit the time unit of the {@code timeout} argument
* @param onBlocked the action to run once the block has been acquired
* @param onFailure the action to run if a failure occurs while blocking operations
* @param <E> the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread)
*/
<E extends Exception> void asyncBlockOperations(
final long timeout, final TimeUnit timeUnit, final CheckedRunnable<E> onBlocked, final Consumer<Exception> onFailure) {
delayOperations();
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
onFailure.accept(e);
}
@Override
protected void doRun() throws Exception {
doBlockOperations(timeout, timeUnit, onBlocked);
}
@Override
public void onAfter() {
releaseDelayedOperations();
}
});
}
private void delayOperations() {
synchronized (this) {
if (delayed) {
throw new IllegalStateException("operations are already delayed");
} else {
assert delayedOperations.isEmpty();
delayed = true;
}
}
}
private <E extends Exception> void doBlockOperations(
final long timeout,
final TimeUnit timeUnit,
final CheckedRunnable<E> onBlocked) throws InterruptedException, TimeoutException, E {
if (Assertions.ENABLED) {
// since delayed is not volatile, we have to synchronize even here for visibility
synchronized (this) {
queuedActions = delayedOperations;
delayedOperations = null;
assert delayed;
}
if (queuedActions != null) {
// Try acquiring permits on fresh thread (for two reasons):
// - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled.
// Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by
// ThreadedActionListener if the queue of the thread pool on which it submits is full.
// - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure
// handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery.
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
}
});
}
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) {
assert semaphore.availablePermits() == 0;
try {
onBlocked.run();
} finally {
semaphore.release(TOTAL_PERMITS);
}
} else {
throw new TimeoutException("timeout while blocking operations");
}
}
private void releaseDelayedOperations() {
final List<ActionListener<Releasable>> queuedActions;
synchronized (this) {
assert delayed;
queuedActions = new ArrayList<>(delayedOperations);
delayedOperations.clear();
delayed = false;
}
if (!queuedActions.isEmpty()) {
/*
* Try acquiring permits on fresh thread (for two reasons):
* - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled;
* interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by
* the threaded action listener if the queue of the thread pool on which it submits is full
* - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the
* onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the
* recovery
*/
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
for (ActionListener<Releasable> queuedAction : queuedActions) {
acquire(queuedAction, null, false);
}
});
}
}
/**
* Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided
* {@link ActionListener} will be called on the calling thread. During calls of
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will
* then be called using the provided executor once operations are no longer blocked.
* {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener}
* will then be called using the provided executor once operations are no longer blocked.
*
* @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed
* @param executorOnDelay executor to use for delayed call
* @param forceExecution whether the runnable should force its execution in case it gets rejected
*/
public void acquire(ActionListener<Releasable> onAcquired, String executorOnDelay, boolean forceExecution) {
public void acquire(final ActionListener<Releasable> onAcquired, final String executorOnDelay, final boolean forceExecution) {
if (closed) {
onAcquired.onFailure(new IndexShardClosedException(shardId));
return;
}
Releasable releasable;
final Releasable releasable;
try {
synchronized (this) {
releasable = tryAcquire();
if (releasable == null) {
// blockOperations is executing, this operation will be retried by blockOperations once it finishes
if (delayedOperations == null) {
delayedOperations = new ArrayList<>();
}
if (delayed) {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
if (executorOnDelay != null) {
delayedOperations.add(
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
} else {
delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
}
return;
} else {
releasable = tryAcquire();
assert releasable != null;
}
}
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
onAcquired.onFailure(e);
return;
}
// execute this outside the synchronized block!
onAcquired.onResponse(releasable);
}
@Nullable private Releasable tryAcquire() throws InterruptedException {
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting
AtomicBoolean closed = new AtomicBoolean();
@Nullable
private Releasable tryAcquire() throws InterruptedException {
assert Thread.holdsLock(this);
if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting
final AtomicBoolean closed = new AtomicBoolean();
return () -> {
if (closed.compareAndSet(false, true)) {
semaphore.release(1);
@ -163,13 +251,23 @@ final class IndexShardOperationPermits implements Closeable {
return null;
}
public int getActiveOperationsCount() {
/**
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight).
*
* @return the active operation count, or zero when all permits ar eheld
*/
int getActiveOperationsCount() {
int availablePermits = semaphore.availablePermits();
if (availablePermits == 0) {
// when blockOperations is holding all permits
/*
* This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the
* remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that
* the active operations count is zero.
*/
return 0;
} else {
return TOTAL_PERMITS - availablePermits;
}
}
}

View File

@ -25,20 +25,30 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
public class IndexShardOperationPermitsTests extends ESTestCase {
@ -143,7 +153,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException {
PlainActionFuture<Releasable> future = new PlainActionFuture<>();
try (Releasable releasable = blockAndWait()) {
try (Releasable ignored = blockAndWait()) {
permits.acquire(future, ThreadPool.Names.GENERIC, true);
assertFalse(future.isDone());
}
@ -184,7 +194,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
}
};
try (Releasable releasable = blockAndWait()) {
try (Releasable ignored = blockAndWait()) {
// we preserve the thread context here so that we have a different context in the call to acquire than the context present
// when the releasable is closed
try (ThreadContext.StoredContext ignore = context.newStoredContext(false)) {
@ -238,6 +248,202 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
};
}
public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedException {
final CountDownLatch blockAcquired = new CountDownLatch(1);
final CountDownLatch releaseBlock = new CountDownLatch(1);
final AtomicBoolean blocked = new AtomicBoolean();
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
blocked.set(true);
blockAcquired.countDown();
releaseBlock.await();
},
e -> {
throw new RuntimeException(e);
});
blockAcquired.await();
assertTrue(blocked.get());
// an operation that is submitted while there is a delay in place should be delayed
final CountDownLatch delayedOperation = new CountDownLatch(1);
final AtomicBoolean delayed = new AtomicBoolean();
final Thread thread = new Thread(() ->
permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
delayed.set(true);
releasable.close();
delayedOperation.countDown();
}
@Override
public void onFailure(Exception e) {
}
},
ThreadPool.Names.GENERIC,
false));
thread.start();
assertFalse(delayed.get());
releaseBlock.countDown();
delayedOperation.await();
assertTrue(delayed.get());
thread.join();
}
public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedException, BrokenBarrierException {
final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch operationExecutingLatch = new CountDownLatch(1);
final CountDownLatch firstOperationLatch = new CountDownLatch(1);
final CountDownLatch firstOperationCompleteLatch = new CountDownLatch(1);
final Thread firstOperationThread =
new Thread(controlledAcquire(barrier, operationExecutingLatch, firstOperationLatch, firstOperationCompleteLatch));
firstOperationThread.start();
barrier.await();
operationExecutingLatch.await();
// now we will delay operations while the first operation is still executing (because it is latched)
final CountDownLatch blockedLatch = new CountDownLatch(1);
final AtomicBoolean onBlocked = new AtomicBoolean();
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
onBlocked.set(true);
blockedLatch.countDown();
}, e -> {
throw new RuntimeException(e);
});
assertFalse(onBlocked.get());
// if we submit another operation, it should be delayed
final CountDownLatch secondOperationExecuting = new CountDownLatch(1);
final CountDownLatch secondOperationComplete = new CountDownLatch(1);
final AtomicBoolean secondOperation = new AtomicBoolean();
final Thread secondOperationThread = new Thread(() -> {
secondOperationExecuting.countDown();
permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
secondOperation.set(true);
releasable.close();
secondOperationComplete.countDown();
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.GENERIC,
false);
});
secondOperationThread.start();
secondOperationExecuting.await();
assertFalse(secondOperation.get());
firstOperationLatch.countDown();
firstOperationCompleteLatch.await();
blockedLatch.await();
assertTrue(onBlocked.get());
secondOperationComplete.await();
assertTrue(secondOperation.get());
firstOperationThread.join();
secondOperationThread.join();
}
public void testAsyncBlockOperationsRace() throws Exception {
// we racily submit operations and a delay, and then ensure that all operations were actually completed
final int operations = scaledRandomIntBetween(1, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + 1 + operations);
final CountDownLatch operationLatch = new CountDownLatch(1 + operations);
final Set<Integer> values = Collections.newSetFromMap(new ConcurrentHashMap<>());
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < operations; i++) {
final int value = i;
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
values.add(value);
releasable.close();
operationLatch.countDown();
}
@Override
public void onFailure(Exception e) {
}
},
ThreadPool.Names.GENERIC,
false);
});
thread.start();
threads.add(thread);
}
final Thread blockingThread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
permits.asyncBlockOperations(
30,
TimeUnit.MINUTES,
() -> {
values.add(operations);
operationLatch.countDown();
}, e -> {
throw new RuntimeException(e);
});
});
blockingThread.start();
barrier.await();
operationLatch.await();
for (final Thread thread : threads) {
thread.join();
}
blockingThread.join();
// check that all operations completed
for (int i = 0; i < operations; i++) {
assertTrue(values.contains(i));
}
assertTrue(values.contains(operations));
/*
* The block operation is executed on another thread and the operations can have completed before this thread has returned all the
* permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to
* the semaphore.
*/
awaitBusy(() -> {
for (final ThreadPoolStats.Stats stats : threadPool.stats()) {
if (ThreadPool.Names.GENERIC.equals(stats.getName())) {
return stats.getActive() == 0;
}
}
return false;
});
}
public void testActiveOperationsCount() throws ExecutionException, InterruptedException {
PlainActionFuture<Releasable> future1 = new PlainActionFuture<>();
permits.acquire(future1, ThreadPool.Names.GENERIC, true);
@ -267,4 +473,114 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
future3.get().close();
assertThat(permits.getActiveOperationsCount(), equalTo(0));
}
public void testAsyncBlockOperationsOnFailure() throws InterruptedException {
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
10,
TimeUnit.MINUTES,
() -> {
throw new RuntimeException("simulated");
},
e -> {
reference.set(e);
onFailureLatch.countDown();
});
onFailureLatch.await();
assertThat(reference.get(), instanceOf(RuntimeException.class));
assertThat(reference.get(), hasToString(containsString("simulated")));
}
public void testTimeout() throws BrokenBarrierException, InterruptedException {
final CyclicBarrier barrier = new CyclicBarrier(2);
final CountDownLatch operationExecutingLatch = new CountDownLatch(1);
final CountDownLatch operationLatch = new CountDownLatch(1);
final CountDownLatch operationCompleteLatch = new CountDownLatch(1);
final Thread thread = new Thread(controlledAcquire(barrier, operationExecutingLatch, operationLatch, operationCompleteLatch));
thread.start();
barrier.await();
operationExecutingLatch.await();
{
final TimeoutException e =
expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {}));
assertThat(e, hasToString(containsString("timeout while blocking operations")));
}
{
final AtomicReference<Exception> reference = new AtomicReference<>();
final CountDownLatch onFailureLatch = new CountDownLatch(1);
permits.asyncBlockOperations(
1,
TimeUnit.MILLISECONDS,
() -> {},
e -> {
reference.set(e);
onFailureLatch.countDown();
});
onFailureLatch.await();
assertThat(reference.get(), hasToString(containsString("timeout while blocking operations")));
}
operationLatch.countDown();
operationCompleteLatch.await();
thread.join();
}
/**
* Returns an operation that acquires a permit and synchronizes in the following manner:
* <ul>
* <li>waits on the {@code barrier} before acquiring a permit</li>
* <li>counts down the {@code operationExecutingLatch} when it acquires the permit</li>
* <li>waits on the {@code operationLatch} before releasing the permit</li>
* <li>counts down the {@code operationCompleteLatch} after releasing the permit</li>
* </ul>
*
* @param barrier the barrier to wait on
* @param operationExecutingLatch the latch to countdown after acquiring the permit
* @param operationLatch the latch to wait on before releasing the permit
* @param operationCompleteLatch the latch to countdown after releasing the permit
* @return a controllable runnable that acquires a permit
*/
private Runnable controlledAcquire(
final CyclicBarrier barrier,
final CountDownLatch operationExecutingLatch,
final CountDownLatch operationLatch,
final CountDownLatch operationCompleteLatch) {
return () -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
permits.acquire(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
operationExecutingLatch.countDown();
try {
operationLatch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
releasable.close();
operationCompleteLatch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.GENERIC,
false);
};
}
}

View File

@ -280,6 +280,114 @@ public class IndexShardTests extends IndexShardTestCase {
}
}
public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException {
final IndexShard indexShard = newStartedShard(false);
final int operations = scaledRandomIntBetween(1, 64);
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
final CountDownLatch latch = new CountDownLatch(operations);
final CountDownLatch operationLatch = new CountDownLatch(1);
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < operations; i++) {
final Thread thread = new Thread(() -> {
try {
barrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquireReplicaOperationPermit(
indexShard.getPrimaryTerm(),
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
latch.countDown();
try {
operationLatch.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
releasable.close();
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.INDEX);
});
thread.start();
threads.add(thread);
}
barrier.await();
latch.await();
// promote the replica
final ShardRouting replicaRouting = indexShard.routingEntry();
final ShardRouting primaryRouting =
TestShardRouting.newShardRouting(
replicaRouting.shardId(),
replicaRouting.currentNodeId(),
null,
true,
ShardRoutingState.STARTED,
replicaRouting.allocationId());
indexShard.updateRoutingEntry(primaryRouting);
indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1);
final int delayedOperations = scaledRandomIntBetween(1, 64);
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations);
final CountDownLatch delayedOperationsLatch = new CountDownLatch(delayedOperations);
final AtomicLong counter = new AtomicLong();
final List<Thread> delayedThreads = new ArrayList<>();
for (int i = 0; i < delayedOperations; i++) {
final Thread thread = new Thread(() -> {
try {
delayedOperationsBarrier.await();
} catch (final BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
indexShard.acquirePrimaryOperationPermit(
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
counter.incrementAndGet();
releasable.close();
delayedOperationsLatch.countDown();
}
@Override
public void onFailure(Exception e) {
throw new RuntimeException(e);
}
},
ThreadPool.Names.INDEX);
});
thread.start();
delayedThreads.add(thread);
}
delayedOperationsBarrier.await();
assertThat(counter.get(), equalTo(0L));
operationLatch.countDown();
for (final Thread thread : threads) {
thread.join();
}
delayedOperationsLatch.await();
assertThat(counter.get(), equalTo((long) delayedOperations));
for (final Thread thread : delayedThreads) {
thread.join();
}
closeShards(indexShard);
}
public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShard indexShard;