Add run under primary permit method (#37440)

This commit adds a simple method for executing a runnable against a
shard under a primary permit. Today there is only a single caller for
this method, but this there are two upcoming use-cases for which having
this method will help keep the code simpler.
This commit is contained in:
Jason Tedor 2019-01-14 21:54:42 -05:00 committed by GitHub
parent e11a32eda8
commit 43bfdd32ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 105 additions and 19 deletions

View File

@ -26,13 +26,11 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -790,23 +788,18 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
continue;
case STARTED:
try {
shard.acquirePrimaryOperationPermit(
ActionListener.wrap(
releasable -> {
try (Releasable ignored = releasable) {
shard.maybeSyncGlobalCheckpoint("background");
}
},
e -> {
if (!(e instanceof AlreadyClosedException || e instanceof IndexShardClosedException)) {
logger.info(
new ParameterizedMessage(
"{} failed to execute background global checkpoint sync",
shard.shardId()),
e);
}
}),
ThreadPool.Names.SAME, "background global checkpoint sync");
shard.runUnderPrimaryPermit(
() -> shard.maybeSyncGlobalCheckpoint("background"),
e -> {
if (e instanceof AlreadyClosedException == false
&& e instanceof IndexShardClosedException == false) {
logger.warn(
new ParameterizedMessage(
"{} failed to execute background global checkpoint sync", shard.shardId()), e);
}
},
ThreadPool.Names.SAME,
"background global checkpoint sync");
} catch (final AlreadyClosedException | IndexShardClosedException e) {
// the shard was closed concurrently, continue
}

View File

@ -2395,6 +2395,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
/**
* Runs the specified runnable under a permit and otherwise calling back the specified failure callback. This method is really a
* convenience for {@link #acquirePrimaryOperationPermit(ActionListener, String, Object)} where the listener equates to
* try-with-resources closing the releasable after executing the runnable on successfully acquiring the permit, an otherwise calling
* back the failure callback.
*
* @param runnable the runnable to execute under permit
* @param onFailure the callback on failure
* @param executorOnDelay the executor to execute the runnable on if permit acquisition is blocked
* @param debugInfo debug info
*/
public void runUnderPrimaryPermit(
final Runnable runnable,
final Consumer<Exception> onFailure,
final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
assert shardRouting.primary() : "runUnderPrimaryPermit should only be called on primary shard but was " + shardRouting;
final ActionListener<Releasable> onPermitAcquired = ActionListener.wrap(
releasable -> {
try (Releasable ignore = releasable) {
runnable.run();
}
},
onFailure);
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo);
}
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
final CheckedRunnable<E> onBlocked,
@Nullable ActionListener<Releasable> combineWithAction) {

View File

@ -147,6 +147,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
@ -331,6 +332,70 @@ public class IndexShardTests extends IndexShardTestCase {
randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L)));
}
public void testRunUnderPrimaryPermitRunsUnderPrimaryPermit() throws IOException {
final IndexShard indexShard = newStartedShard(true);
try {
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
indexShard.runUnderPrimaryPermit(
() -> assertThat(indexShard.getActiveOperationsCount(), equalTo(1)),
e -> fail(e.toString()),
ThreadPool.Names.SAME,
"test");
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
} finally {
closeShards(indexShard);
}
}
public void testRunUnderPrimaryPermitOnFailure() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final AtomicBoolean invoked = new AtomicBoolean();
try {
indexShard.runUnderPrimaryPermit(
() -> {
throw new RuntimeException("failure");
},
e -> {
assertThat(e, instanceOf(RuntimeException.class));
assertThat(e.getMessage(), equalTo("failure"));
invoked.set(true);
},
ThreadPool.Names.SAME,
"test");
assertTrue(invoked.get());
} finally {
closeShards(indexShard);
}
}
public void testRunUnderPrimaryPermitDelaysToExecutorWhenBlocked() throws Exception {
final IndexShard indexShard = newStartedShard(true);
try {
final PlainActionFuture<Releasable> onAcquired = new PlainActionFuture<>();
indexShard.acquireAllPrimaryOperationsPermits(onAcquired, new TimeValue(Long.MAX_VALUE, TimeUnit.NANOSECONDS));
final Releasable permit = onAcquired.actionGet();
final CountDownLatch latch = new CountDownLatch(1);
final String executorOnDelay =
randomFrom(ThreadPool.Names.FLUSH, ThreadPool.Names.GENERIC, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.SAME);
indexShard.runUnderPrimaryPermit(
() -> {
final String expectedThreadPoolName =
executorOnDelay.equals(ThreadPool.Names.SAME) ? "generic" : executorOnDelay.toLowerCase(Locale.ROOT);
assertThat(Thread.currentThread().getName(), containsString(expectedThreadPoolName));
latch.countDown();
},
e -> fail(e.toString()),
executorOnDelay,
"test");
permit.close();
latch.await();
// we could race and assert on the count before the permit is returned
assertBusy(() -> assertThat(indexShard.getActiveOperationsCount(), equalTo(0)));
} finally {
closeShards(indexShard);
}
}
public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
IndexShard indexShard = newShard(false);
expectThrows(IndexShardNotStartedException.class, () ->