diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 6c01e61586d..3eaad1eee54 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -26,13 +26,11 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.util.Accountable; import org.elasticsearch.Assertions; import org.elasticsearch.Version; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; @@ -790,23 +788,18 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust continue; case STARTED: try { - shard.acquirePrimaryOperationPermit( - ActionListener.wrap( - releasable -> { - try (Releasable ignored = releasable) { - shard.maybeSyncGlobalCheckpoint("background"); - } - }, - e -> { - if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) { - logger.info( - new ParameterizedMessage( - "{} failed to execute background global checkpoint sync", - shard.shardId()), - e); - } - }), - ThreadPool.Names.SAME, "background global checkpoint sync"); + shard.runUnderPrimaryPermit( + () -> shard.maybeSyncGlobalCheckpoint("background"), + e -> { + if (e instanceof AlreadyClosedException == false + && e instanceof IndexShardClosedException == false) { + logger.warn( + new ParameterizedMessage( + "{} failed to execute background global checkpoint sync", shard.shardId()), e); + } + }, + ThreadPool.Names.SAME, + "background global checkpoint sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c76c845328b..8a10f4021d2 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2395,6 +2395,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } + /** + * Runs the specified runnable under a permit and otherwise calling back the specified failure callback. This method is really a + * convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)} where the listener equates to + * try-with-resources closing the releasable after executing the runnable on successfully acquiring the permit, an otherwise calling + * back the failure callback. + * + * @param runnable the runnable to execute under permit + * @param onFailure the callback on failure + * @param executorOnDelay the executor to execute the runnable on if permit acquisition is blocked + * @param debugInfo debug info + */ + public void runUnderPrimaryPermit( + final Runnable runnable, + final Consumer onFailure, + final String executorOnDelay, + final Object debugInfo) { + verifyNotClosed(); + assert shardRouting.primary() : "runUnderPrimaryPermit should only be called on primary shard but was " + shardRouting; + final ActionListener onPermitAcquired = ActionListener.wrap( + releasable -> { + try (Releasable ignore = releasable) { + runnable.run(); + } + }, + onFailure); + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo); + } + private void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable onBlocked, @Nullable ActionListener combineWithAction) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 5c7dd089534..9bb98da6e96 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -147,6 +147,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.BrokenBarrierException; @@ -331,6 +332,70 @@ public class IndexShardTests extends IndexShardTestCase { randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L))); } + public void testRunUnderPrimaryPermitRunsUnderPrimaryPermit() throws IOException { + final IndexShard indexShard = newStartedShard(true); + try { + assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); + indexShard.runUnderPrimaryPermit( + () -> assertThat(indexShard.getActiveOperationsCount(), equalTo(1)), + e -> fail(e.toString()), + ThreadPool.Names.SAME, + "test"); + assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); + } finally { + closeShards(indexShard); + } + } + + public void testRunUnderPrimaryPermitOnFailure() throws IOException { + final IndexShard indexShard = newStartedShard(true); + final AtomicBoolean invoked = new AtomicBoolean(); + try { + indexShard.runUnderPrimaryPermit( + () -> { + throw new RuntimeException("failure"); + }, + e -> { + assertThat(e, instanceOf(RuntimeException.class)); + assertThat(e.getMessage(), equalTo("failure")); + invoked.set(true); + }, + ThreadPool.Names.SAME, + "test"); + assertTrue(invoked.get()); + } finally { + closeShards(indexShard); + } + } + + public void testRunUnderPrimaryPermitDelaysToExecutorWhenBlocked() throws Exception { + final IndexShard indexShard = newStartedShard(true); + try { + final PlainActionFuture onAcquired = new PlainActionFuture<>(); + indexShard.acquireAllPrimaryOperationsPermits(onAcquired, new TimeValue(Long.MAX_VALUE, TimeUnit.NANOSECONDS)); + final Releasable permit = onAcquired.actionGet(); + final CountDownLatch latch = new CountDownLatch(1); + final String executorOnDelay = + randomFrom(ThreadPool.Names.FLUSH, ThreadPool.Names.GENERIC, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.SAME); + indexShard.runUnderPrimaryPermit( + () -> { + final String expectedThreadPoolName = + executorOnDelay.equals(ThreadPool.Names.SAME) ? "generic" : executorOnDelay.toLowerCase(Locale.ROOT); + assertThat(Thread.currentThread().getName(), containsString(expectedThreadPoolName)); + latch.countDown(); + }, + e -> fail(e.toString()), + executorOnDelay, + "test"); + permit.close(); + latch.await(); + // we could race and assert on the count before the permit is returned + assertBusy(() -> assertThat(indexShard.getActiveOperationsCount(), equalTo(0))); + } finally { + closeShards(indexShard); + } + } + public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException { IndexShard indexShard = newShard(false); expectThrows(IndexShardNotStartedException.class, () ->