diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5f1f94f72f7..6a790261252 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -131,6 +131,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -332,12 +333,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * notifies the shard of an increase in the primary term + * Notifies the shard of an increase in the primary term. + * + * @param newPrimaryTerm the new primary term */ - public void updatePrimaryTerm(final long newTerm) { + public void updatePrimaryTerm(final long newPrimaryTerm) { assert shardRouting.primary() : "primary term can only be explicitly updated on a primary shard"; synchronized (mutex) { - if (newTerm != primaryTerm) { + if (newPrimaryTerm != primaryTerm) { // Note that due to cluster state batching an initializing primary shard term can failed and re-assigned // in one state causing it's term to be incremented. Note that if both current shard state and new // shard state are initializing, we could replace the current shard and reinitialize it. It is however @@ -354,10 +357,22 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl "a started primary shard should never update its term; " + "shard " + shardRouting + ", " + "current term [" + primaryTerm + "], " - + "new term [" + newTerm + "]"; - assert newTerm > primaryTerm : - "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newTerm + "]"; - primaryTerm = newTerm; + + "new term [" + newPrimaryTerm + "]"; + assert newPrimaryTerm > primaryTerm : + "primary terms can only go up; current term [" + primaryTerm + "], new term [" + newPrimaryTerm + "]"; + /* + * Before this call returns, we are guaranteed that all future operations are delayed and so this happens before we + * increment the primary term. The latch is needed to ensure that we do not unblock operations before the primary term is + * incremented. + */ + final CountDownLatch latch = new CountDownLatch(1); + indexShardOperationPermits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + latch::await, + e -> failShard("exception during primary term transition", e)); + primaryTerm = newPrimaryTerm; + latch.countDown(); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index fea26168efa..83a372dd453 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -20,12 +20,14 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Assertions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; import org.elasticsearch.threadpool.ThreadPool; @@ -36,20 +38,35 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; +/** + * Tracks shard operation permits. Each operation on the shard obtains a permit. When we need to block operations (e.g., to transition + * between terms) we immediately delay all operations to a queue, obtain all available permits, and wait for outstanding operations to drain + * and return their permits. Delayed operations will acquire permits and be completed after the operation that blocked all operations has + * completed. + */ final class IndexShardOperationPermits implements Closeable { + private final ShardId shardId; private final Logger logger; private final ThreadPool threadPool; private static final int TOTAL_PERMITS = Integer.MAX_VALUE; - // fair semaphore to ensure that blockOperations() does not starve under thread contention - final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); - @Nullable private List> delayedOperations; // operations that are delayed + final Semaphore semaphore = new Semaphore(TOTAL_PERMITS, true); // fair to ensure a blocking thread is not starved + private final List> delayedOperations = new ArrayList<>(); // operations that are delayed private volatile boolean closed; + private boolean delayed; // does not need to be volatile as all accesses are done under a lock on this - IndexShardOperationPermits(ShardId shardId, Logger logger, ThreadPool threadPool) { + /** + * Construct operation permits for the specified shards. + * + * @param shardId the shard + * @param logger the logger for the shard + * @param threadPool the thread pool (used to execute delayed operations) + */ + IndexShardOperationPermits(final ShardId shardId, final Logger logger, final ThreadPool threadPool) { this.shardId = shardId; this.logger = logger; this.threadPool = threadPool; @@ -61,99 +78,170 @@ final class IndexShardOperationPermits implements Closeable { } /** - * Wait for in-flight operations to finish and executes onBlocked under the guarantee that no new operations are started. Queues - * operations that are occurring in the meanwhile and runs them once onBlocked has executed. + * Wait for in-flight operations to finish and executes {@code onBlocked} under the guarantee that no new operations are started. Queues + * operations that are occurring in the meanwhile and runs them once {@code onBlocked} has executed. * - * @param timeout the maximum time to wait for the in-flight operations block - * @param timeUnit the time unit of the {@code timeout} argument + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument * @param onBlocked the action to run once the block has been acquired - * @throws InterruptedException if calling thread is interrupted - * @throws TimeoutException if timed out waiting for in-flight operations to finish + * @param the type of checked exception thrown by {@code onBlocked} + * @throws InterruptedException if calling thread is interrupted + * @throws TimeoutException if timed out waiting for in-flight operations to finish * @throws IndexShardClosedException if operation permit has been closed */ - public void blockOperations(long timeout, TimeUnit timeUnit, CheckedRunnable onBlocked) throws - InterruptedException, TimeoutException, E { + void blockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { if (closed) { throw new IndexShardClosedException(shardId); } + delayOperations(); try { - if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { - assert semaphore.availablePermits() == 0; - try { - onBlocked.run(); - } finally { - semaphore.release(TOTAL_PERMITS); - } - } else { - throw new TimeoutException("timed out during blockOperations"); - } + doBlockOperations(timeout, timeUnit, onBlocked); } finally { - final List> queuedActions; + releaseDelayedOperations(); + } + } + + /** + * Immediately delays operations and on another thread waits for in-flight operations to finish and then executes {@code onBlocked} + * under the guarantee that no new operations are started. Delayed operations are run after {@code onBlocked} has executed. After + * operations are delayed and the blocking is forked to another thread, returns to the caller. If a failure occurs while blocking + * operations or executing {@code onBlocked} then the {@code onFailure} handler will be invoked. + * + * @param timeout the maximum time to wait for the in-flight operations block + * @param timeUnit the time unit of the {@code timeout} argument + * @param onBlocked the action to run once the block has been acquired + * @param onFailure the action to run if a failure occurs while blocking operations + * @param the type of checked exception thrown by {@code onBlocked} (not thrown on the calling thread) + */ + void asyncBlockOperations( + final long timeout, final TimeUnit timeUnit, final CheckedRunnable onBlocked, final Consumer onFailure) { + delayOperations(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() { + @Override + public void onFailure(final Exception e) { + onFailure.accept(e); + } + + @Override + protected void doRun() throws Exception { + doBlockOperations(timeout, timeUnit, onBlocked); + } + + @Override + public void onAfter() { + releaseDelayedOperations(); + } + }); + } + + private void delayOperations() { + synchronized (this) { + if (delayed) { + throw new IllegalStateException("operations are already delayed"); + } else { + assert delayedOperations.isEmpty(); + delayed = true; + } + } + } + + private void doBlockOperations( + final long timeout, + final TimeUnit timeUnit, + final CheckedRunnable onBlocked) throws InterruptedException, TimeoutException, E { + if (Assertions.ENABLED) { + // since delayed is not volatile, we have to synchronize even here for visibility synchronized (this) { - queuedActions = delayedOperations; - delayedOperations = null; + assert delayed; } - if (queuedActions != null) { - // Try acquiring permits on fresh thread (for two reasons): - // - blockOperations can be called on recovery thread which can be expected to be interrupted when recovery is cancelled. - // Interruptions are bad here as permit acquisition will throw an InterruptedException which will be swallowed by - // ThreadedActionListener if the queue of the thread pool on which it submits is full. - // - if permit is acquired and queue of the thread pool which the ThreadedActionListener uses is full, the onFailure - // handler is executed on the calling thread. This should not be the recovery thread as it would delay the recovery. - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - for (ActionListener queuedAction : queuedActions) { - acquire(queuedAction, null, false); - } - }); + } + if (semaphore.tryAcquire(TOTAL_PERMITS, timeout, timeUnit)) { + assert semaphore.availablePermits() == 0; + try { + onBlocked.run(); + } finally { + semaphore.release(TOTAL_PERMITS); } + } else { + throw new TimeoutException("timeout while blocking operations"); + } + } + + private void releaseDelayedOperations() { + final List> queuedActions; + synchronized (this) { + assert delayed; + queuedActions = new ArrayList<>(delayedOperations); + delayedOperations.clear(); + delayed = false; + } + if (!queuedActions.isEmpty()) { + /* + * Try acquiring permits on fresh thread (for two reasons): + * - blockOperations can be called on a recovery thread which can be expected to be interrupted when recovery is cancelled; + * interruptions are bad here as permit acquisition will throw an interrupted exception which will be swallowed by + * the threaded action listener if the queue of the thread pool on which it submits is full + * - if a permit is acquired and the queue of the thread pool which the the threaded action listener uses is full, the + * onFailure handler is executed on the calling thread; this should not be the recovery thread as it would delay the + * recovery + */ + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + for (ActionListener queuedAction : queuedActions) { + acquire(queuedAction, null, false); + } + }); } } /** * Acquires a permit whenever permit acquisition is not blocked. If the permit is directly available, the provided * {@link ActionListener} will be called on the calling thread. During calls of - * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided ActionListener will - * then be called using the provided executor once operations are no longer blocked. + * {@link #blockOperations(long, TimeUnit, CheckedRunnable)}, permit acquisition can be delayed. The provided {@link ActionListener} + * will then be called using the provided executor once operations are no longer blocked. * * @param onAcquired {@link ActionListener} that is invoked once acquisition is successful or failed * @param executorOnDelay executor to use for delayed call * @param forceExecution whether the runnable should force its execution in case it gets rejected */ - public void acquire(ActionListener onAcquired, String executorOnDelay, boolean forceExecution) { + public void acquire(final ActionListener onAcquired, final String executorOnDelay, final boolean forceExecution) { if (closed) { onAcquired.onFailure(new IndexShardClosedException(shardId)); return; } - Releasable releasable; + final Releasable releasable; try { synchronized (this) { - releasable = tryAcquire(); - if (releasable == null) { - // blockOperations is executing, this operation will be retried by blockOperations once it finishes - if (delayedOperations == null) { - delayedOperations = new ArrayList<>(); - } + if (delayed) { final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false); if (executorOnDelay != null) { delayedOperations.add( - new ThreadedActionListener<>(logger, threadPool, executorOnDelay, - new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); + new ThreadedActionListener<>(logger, threadPool, executorOnDelay, + new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution)); } else { delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired)); } return; + } else { + releasable = tryAcquire(); + assert releasable != null; } } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { onAcquired.onFailure(e); return; } + // execute this outside the synchronized block! onAcquired.onResponse(releasable); } - @Nullable private Releasable tryAcquire() throws InterruptedException { - if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the untimed tryAcquire methods do not honor the fairness setting - AtomicBoolean closed = new AtomicBoolean(); + @Nullable + private Releasable tryAcquire() throws InterruptedException { + assert Thread.holdsLock(this); + if (semaphore.tryAcquire(1, 0, TimeUnit.SECONDS)) { // the un-timed tryAcquire methods do not honor the fairness setting + final AtomicBoolean closed = new AtomicBoolean(); return () -> { if (closed.compareAndSet(false, true)) { semaphore.release(1); @@ -163,13 +251,23 @@ final class IndexShardOperationPermits implements Closeable { return null; } - public int getActiveOperationsCount() { + /** + * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). + * + * @return the active operation count, or zero when all permits ar eheld + */ + int getActiveOperationsCount() { int availablePermits = semaphore.availablePermits(); if (availablePermits == 0) { - // when blockOperations is holding all permits + /* + * This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the + * remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that + * the active operations count is zero. + */ return 0; } else { return TOTAL_PERMITS - availablePermits; } } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index 18a250a4282..ec22f9d862b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -25,20 +25,30 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.ThreadPoolStats; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; import static org.hamcrest.Matchers.instanceOf; public class IndexShardOperationPermitsTests extends ESTestCase { @@ -143,7 +153,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { public void testOperationsDelayedIfBlock() throws ExecutionException, InterruptedException, TimeoutException { PlainActionFuture future = new PlainActionFuture<>(); - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { permits.acquire(future, ThreadPool.Names.GENERIC, true); assertFalse(future.isDone()); } @@ -184,7 +194,7 @@ public class IndexShardOperationPermitsTests extends ESTestCase { } }; - try (Releasable releasable = blockAndWait()) { + try (Releasable ignored = blockAndWait()) { // we preserve the thread context here so that we have a different context in the call to acquire than the context present // when the releasable is closed try (ThreadContext.StoredContext ignore = context.newStoredContext(false)) { @@ -238,6 +248,202 @@ public class IndexShardOperationPermitsTests extends ESTestCase { }; } + public void testAsyncBlockOperationsOperationWhileBlocked() throws InterruptedException { + final CountDownLatch blockAcquired = new CountDownLatch(1); + final CountDownLatch releaseBlock = new CountDownLatch(1); + final AtomicBoolean blocked = new AtomicBoolean(); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + blocked.set(true); + blockAcquired.countDown(); + releaseBlock.await(); + }, + e -> { + throw new RuntimeException(e); + }); + blockAcquired.await(); + assertTrue(blocked.get()); + + // an operation that is submitted while there is a delay in place should be delayed + final CountDownLatch delayedOperation = new CountDownLatch(1); + final AtomicBoolean delayed = new AtomicBoolean(); + final Thread thread = new Thread(() -> + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + delayed.set(true); + releasable.close(); + delayedOperation.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false)); + thread.start(); + assertFalse(delayed.get()); + releaseBlock.countDown(); + delayedOperation.await(); + assertTrue(delayed.get()); + thread.join(); + } + + public void testAsyncBlockOperationsOperationBeforeBlocked() throws InterruptedException, BrokenBarrierException { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); + final CountDownLatch firstOperationLatch = new CountDownLatch(1); + final CountDownLatch firstOperationCompleteLatch = new CountDownLatch(1); + final Thread firstOperationThread = + new Thread(controlledAcquire(barrier, operationExecutingLatch, firstOperationLatch, firstOperationCompleteLatch)); + firstOperationThread.start(); + + barrier.await(); + + operationExecutingLatch.await(); + + // now we will delay operations while the first operation is still executing (because it is latched) + final CountDownLatch blockedLatch = new CountDownLatch(1); + final AtomicBoolean onBlocked = new AtomicBoolean(); + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + onBlocked.set(true); + blockedLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); + + assertFalse(onBlocked.get()); + + // if we submit another operation, it should be delayed + final CountDownLatch secondOperationExecuting = new CountDownLatch(1); + final CountDownLatch secondOperationComplete = new CountDownLatch(1); + final AtomicBoolean secondOperation = new AtomicBoolean(); + final Thread secondOperationThread = new Thread(() -> { + secondOperationExecuting.countDown(); + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + secondOperation.set(true); + releasable.close(); + secondOperationComplete.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }); + secondOperationThread.start(); + + secondOperationExecuting.await(); + assertFalse(secondOperation.get()); + + firstOperationLatch.countDown(); + firstOperationCompleteLatch.await(); + blockedLatch.await(); + assertTrue(onBlocked.get()); + + secondOperationComplete.await(); + assertTrue(secondOperation.get()); + + firstOperationThread.join(); + secondOperationThread.join(); + } + + public void testAsyncBlockOperationsRace() throws Exception { + // we racily submit operations and a delay, and then ensure that all operations were actually completed + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + 1 + operations); + final CountDownLatch operationLatch = new CountDownLatch(1 + operations); + final Set values = Collections.newSetFromMap(new ConcurrentHashMap<>()); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final int value = i; + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + values.add(value); + releasable.close(); + operationLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + + } + }, + ThreadPool.Names.GENERIC, + false); + }); + thread.start(); + threads.add(thread); + } + + final Thread blockingThread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.asyncBlockOperations( + 30, + TimeUnit.MINUTES, + () -> { + values.add(operations); + operationLatch.countDown(); + }, e -> { + throw new RuntimeException(e); + }); + }); + blockingThread.start(); + + barrier.await(); + + operationLatch.await(); + for (final Thread thread : threads) { + thread.join(); + } + blockingThread.join(); + + // check that all operations completed + for (int i = 0; i < operations; i++) { + assertTrue(values.contains(i)); + } + assertTrue(values.contains(operations)); + /* + * The block operation is executed on another thread and the operations can have completed before this thread has returned all the + * permits to the semaphore. We wait here until all generic threads are idle as an indication that all permits have been returned to + * the semaphore. + */ + awaitBusy(() -> { + for (final ThreadPoolStats.Stats stats : threadPool.stats()) { + if (ThreadPool.Names.GENERIC.equals(stats.getName())) { + return stats.getActive() == 0; + } + } + return false; + }); + } + public void testActiveOperationsCount() throws ExecutionException, InterruptedException { PlainActionFuture future1 = new PlainActionFuture<>(); permits.acquire(future1, ThreadPool.Names.GENERIC, true); @@ -267,4 +473,114 @@ public class IndexShardOperationPermitsTests extends ESTestCase { future3.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(0)); } + + public void testAsyncBlockOperationsOnFailure() throws InterruptedException { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 10, + TimeUnit.MINUTES, + () -> { + throw new RuntimeException("simulated"); + }, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), instanceOf(RuntimeException.class)); + assertThat(reference.get(), hasToString(containsString("simulated"))); + } + + public void testTimeout() throws BrokenBarrierException, InterruptedException { + final CyclicBarrier barrier = new CyclicBarrier(2); + final CountDownLatch operationExecutingLatch = new CountDownLatch(1); + final CountDownLatch operationLatch = new CountDownLatch(1); + final CountDownLatch operationCompleteLatch = new CountDownLatch(1); + + final Thread thread = new Thread(controlledAcquire(barrier, operationExecutingLatch, operationLatch, operationCompleteLatch)); + thread.start(); + + barrier.await(); + + operationExecutingLatch.await(); + + { + final TimeoutException e = + expectThrows(TimeoutException.class, () -> permits.blockOperations(1, TimeUnit.MILLISECONDS, () -> {})); + assertThat(e, hasToString(containsString("timeout while blocking operations"))); + } + + { + final AtomicReference reference = new AtomicReference<>(); + final CountDownLatch onFailureLatch = new CountDownLatch(1); + permits.asyncBlockOperations( + 1, + TimeUnit.MILLISECONDS, + () -> {}, + e -> { + reference.set(e); + onFailureLatch.countDown(); + }); + onFailureLatch.await(); + assertThat(reference.get(), hasToString(containsString("timeout while blocking operations"))); + } + + operationLatch.countDown(); + + operationCompleteLatch.await(); + + thread.join(); + } + + /** + * Returns an operation that acquires a permit and synchronizes in the following manner: + *
    + *
  • waits on the {@code barrier} before acquiring a permit
  • + *
  • counts down the {@code operationExecutingLatch} when it acquires the permit
  • + *
  • waits on the {@code operationLatch} before releasing the permit
  • + *
  • counts down the {@code operationCompleteLatch} after releasing the permit
  • + *
+ * + * @param barrier the barrier to wait on + * @param operationExecutingLatch the latch to countdown after acquiring the permit + * @param operationLatch the latch to wait on before releasing the permit + * @param operationCompleteLatch the latch to countdown after releasing the permit + * @return a controllable runnable that acquires a permit + */ + private Runnable controlledAcquire( + final CyclicBarrier barrier, + final CountDownLatch operationExecutingLatch, + final CountDownLatch operationLatch, + final CountDownLatch operationCompleteLatch) { + return () -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + permits.acquire( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + operationExecutingLatch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + operationCompleteLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.GENERIC, + false); + }; + } + } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 6dce3dab3a9..e7aa3c61b4e 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -280,6 +280,114 @@ public class IndexShardTests extends IndexShardTestCase { } } + public void testPrimaryPromotionDelaysOperations() throws IOException, BrokenBarrierException, InterruptedException { + final IndexShard indexShard = newStartedShard(false); + + final int operations = scaledRandomIntBetween(1, 64); + final CyclicBarrier barrier = new CyclicBarrier(1 + operations); + final CountDownLatch latch = new CountDownLatch(operations); + final CountDownLatch operationLatch = new CountDownLatch(1); + final List threads = new ArrayList<>(); + for (int i = 0; i < operations; i++) { + final Thread thread = new Thread(() -> { + try { + barrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquireReplicaOperationPermit( + indexShard.getPrimaryTerm(), + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + latch.countDown(); + try { + operationLatch.await(); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + releasable.close(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + threads.add(thread); + } + + barrier.await(); + latch.await(); + + // promote the replica + final ShardRouting replicaRouting = indexShard.routingEntry(); + final ShardRouting primaryRouting = + TestShardRouting.newShardRouting( + replicaRouting.shardId(), + replicaRouting.currentNodeId(), + null, + true, + ShardRoutingState.STARTED, + replicaRouting.allocationId()); + indexShard.updateRoutingEntry(primaryRouting); + indexShard.updatePrimaryTerm(indexShard.getPrimaryTerm() + 1); + + final int delayedOperations = scaledRandomIntBetween(1, 64); + final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(1 + delayedOperations); + final CountDownLatch delayedOperationsLatch = new CountDownLatch(delayedOperations); + final AtomicLong counter = new AtomicLong(); + final List delayedThreads = new ArrayList<>(); + for (int i = 0; i < delayedOperations; i++) { + final Thread thread = new Thread(() -> { + try { + delayedOperationsBarrier.await(); + } catch (final BrokenBarrierException | InterruptedException e) { + throw new RuntimeException(e); + } + indexShard.acquirePrimaryOperationPermit( + new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + counter.incrementAndGet(); + releasable.close(); + delayedOperationsLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + throw new RuntimeException(e); + } + }, + ThreadPool.Names.INDEX); + }); + thread.start(); + delayedThreads.add(thread); + } + + delayedOperationsBarrier.await(); + + assertThat(counter.get(), equalTo(0L)); + + operationLatch.countDown(); + for (final Thread thread : threads) { + thread.join(); + } + + delayedOperationsLatch.await(); + + assertThat(counter.get(), equalTo((long) delayedOperations)); + + for (final Thread thread : delayedThreads) { + thread.join(); + } + + closeShards(indexShard); + } + public void testOperationPermitsOnPrimaryShards() throws InterruptedException, ExecutionException, IOException { final ShardId shardId = new ShardId("test", "_na_", 0); final IndexShard indexShard;