Combine the execution of an exclusive replica operation with primary term update (#36116)
This commit changes how an operation which requires all index shard operations permits is executed when a primary term update is required: the operation and the update are combined so that the operation is executed after the primary term update under the same blocking operation. Closes #35850 Co-authored-by: Yannick Welsch <yannick@welsch.lu>
This commit is contained in:
parent
89fae42833
commit
5d684ca473
|
@ -63,6 +63,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.AsyncIOProcessor;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
@ -548,7 +549,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
} catch (final AlreadyClosedException e) {
|
||||
// okay, the index was deleted
|
||||
}
|
||||
});
|
||||
}, null);
|
||||
}
|
||||
}
|
||||
// set this last, once we finished updating all internal state.
|
||||
|
@ -2316,14 +2317,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
|
||||
}
|
||||
|
||||
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
|
||||
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
|
||||
final CheckedRunnable<E> onBlocked,
|
||||
@Nullable ActionListener<Releasable> combineWithAction) {
|
||||
assert Thread.holdsLock(mutex);
|
||||
assert newPrimaryTerm > pendingPrimaryTerm;
|
||||
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
final CountDownLatch termUpdated = new CountDownLatch(1);
|
||||
indexShardOperationPermits.asyncBlockOperations(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
try {
|
||||
innerFail(e);
|
||||
} finally {
|
||||
if (combineWithAction != null) {
|
||||
combineWithAction.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void innerFail(final Exception e) {
|
||||
try {
|
||||
failShard("exception during primary term transition", e);
|
||||
} catch (AlreadyClosedException ace) {
|
||||
|
@ -2333,7 +2346,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
try (Releasable ignored = releasable) {
|
||||
final RunOnce releaseOnce = new RunOnce(releasable::close);
|
||||
try {
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
termUpdated.await();
|
||||
// indexShardOperationPermits doesn't guarantee that async submissions are executed
|
||||
|
@ -2343,7 +2357,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
onBlocked.run();
|
||||
}
|
||||
} catch (final Exception e) {
|
||||
onFailure(e);
|
||||
if (combineWithAction == null) {
|
||||
// otherwise leave it to combineWithAction to release the permit
|
||||
releaseOnce.run();
|
||||
}
|
||||
innerFail(e);
|
||||
} finally {
|
||||
if (combineWithAction != null) {
|
||||
combineWithAction.onResponse(releasable);
|
||||
} else {
|
||||
releaseOnce.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 30, TimeUnit.MINUTES);
|
||||
|
@ -2371,7 +2395,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
|
||||
final Object debugInfo) {
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, false,
|
||||
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
|
||||
}
|
||||
|
||||
|
@ -2393,7 +2417,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired,
|
||||
final TimeValue timeout) {
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired, true,
|
||||
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
|
||||
}
|
||||
|
||||
|
@ -2401,41 +2425,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired,
|
||||
final Consumer<ActionListener<Releasable>> consumer) {
|
||||
final boolean allowCombineOperationWithPrimaryTermUpdate,
|
||||
final Consumer<ActionListener<Releasable>> operationExecutor) {
|
||||
verifyNotClosed();
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
synchronized (mutex) {
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
final IndexShardState shardState = state();
|
||||
// only roll translog and update primary term if shard has made it past recovery
|
||||
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
|
||||
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
|
||||
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
|
||||
if (shardState != IndexShardState.POST_RECOVERY &&
|
||||
shardState != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, shardState);
|
||||
}
|
||||
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
bumpPrimaryTerm(opPrimaryTerm, () -> {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
|
||||
final long currentGlobalCheckpoint = getGlobalCheckpoint();
|
||||
final long maxSeqNo = seqNoStats().getMaxSeqNo();
|
||||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
|
||||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
|
||||
if (currentGlobalCheckpoint < maxSeqNo) {
|
||||
resetEngineToGlobalCheckpoint();
|
||||
} else {
|
||||
getEngine().rollTranslogGeneration();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assert opPrimaryTerm <= pendingPrimaryTerm
|
||||
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
|
||||
consumer.accept(new ActionListener<Releasable>() {
|
||||
// This listener is used for the execution of the operation. If the operation requires all the permits for its
|
||||
// execution and the primary term must be updated first, we can combine the operation execution with the
|
||||
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
|
||||
// in the order submitted, combining both operations ensure that the term is updated before the operation is
|
||||
// executed. It also has the side effect of acquiring all the permits one time instead of two.
|
||||
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
if (opPrimaryTerm < operationPrimaryTerm) {
|
||||
|
@ -2465,7 +2464,48 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
public void onFailure(final Exception e) {
|
||||
onPermitAcquired.onFailure(e);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
|
||||
synchronized (mutex) {
|
||||
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
|
||||
final IndexShardState shardState = state();
|
||||
// only roll translog and update primary term if shard has made it past recovery
|
||||
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
|
||||
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
|
||||
// We abort early here to prevent an ongoing recovery from the failed primary to mess with the global / local checkpoint
|
||||
if (shardState != IndexShardState.POST_RECOVERY &&
|
||||
shardState != IndexShardState.STARTED) {
|
||||
throw new IndexShardNotStartedException(shardId, shardState);
|
||||
}
|
||||
|
||||
bumpPrimaryTerm(opPrimaryTerm, () -> {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
|
||||
final long currentGlobalCheckpoint = getGlobalCheckpoint();
|
||||
final long maxSeqNo = seqNoStats().getMaxSeqNo();
|
||||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
|
||||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
|
||||
if (currentGlobalCheckpoint < maxSeqNo) {
|
||||
resetEngineToGlobalCheckpoint();
|
||||
} else {
|
||||
getEngine().rollTranslogGeneration();
|
||||
}
|
||||
}, allowCombineOperationWithPrimaryTermUpdate ? operationListener : null);
|
||||
|
||||
if (allowCombineOperationWithPrimaryTermUpdate) {
|
||||
logger.debug("operation execution has been combined with primary term update");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assert opPrimaryTerm <= pendingPrimaryTerm
|
||||
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
|
||||
operationExecutor.accept(operationListener);
|
||||
}
|
||||
|
||||
private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
|
||||
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
|
||||
}
|
||||
|
||||
public int getActiveOperationsCount() {
|
||||
|
|
|
@ -733,7 +733,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
return fut.get();
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
|
||||
public void testOperationPermitOnReplicaShards() throws Exception {
|
||||
final ShardId shardId = new ShardId("test", "_na_", 0);
|
||||
final IndexShard indexShard;
|
||||
|
@ -1024,7 +1023,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShards(replicaShard, primaryShard);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
|
||||
public void testRestoreLocalHistoryFromTranslogOnPromotion() throws IOException, InterruptedException {
|
||||
final IndexShard indexShard = newStartedShard(false);
|
||||
final int operations = 1024 - scaledRandomIntBetween(0, 1024);
|
||||
|
@ -1089,7 +1087,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShard(indexShard, false);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl="https://github.com/elastic/elasticsearch/issues/35850")
|
||||
public void testRollbackReplicaEngineOnPromotion() throws IOException, InterruptedException {
|
||||
final IndexShard indexShard = newStartedShard(false);
|
||||
|
||||
|
@ -3600,6 +3597,67 @@ public class IndexShardTests extends IndexShardTestCase {
|
|||
closeShard(shard, false);
|
||||
}
|
||||
|
||||
public void testConcurrentAcquireAllReplicaOperationsPermitsWithPrimaryTermUpdate() throws Exception {
|
||||
final IndexShard replica = newStartedShard(false);
|
||||
indexOnReplicaWithGaps(replica, between(0, 1000), Math.toIntExact(replica.getLocalCheckpoint()));
|
||||
|
||||
final int nbTermUpdates = randomIntBetween(1, 5);
|
||||
|
||||
for (int i = 0; i < nbTermUpdates; i++) {
|
||||
long opPrimaryTerm = replica.getOperationPrimaryTerm() + 1;
|
||||
final long globalCheckpoint = replica.getGlobalCheckpoint();
|
||||
final long maxSeqNoOfUpdatesOrDeletes = replica.getMaxSeqNoOfUpdatesOrDeletes();
|
||||
|
||||
final int operations = scaledRandomIntBetween(5, 32);
|
||||
final CyclicBarrier barrier = new CyclicBarrier(1 + operations);
|
||||
final CountDownLatch latch = new CountDownLatch(operations);
|
||||
|
||||
final Thread[] threads = new Thread[operations];
|
||||
for (int j = 0; j < operations; j++) {
|
||||
threads[j] = new Thread(() -> {
|
||||
try {
|
||||
barrier.await();
|
||||
} catch (final BrokenBarrierException | InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
replica.acquireAllReplicaOperationsPermits(
|
||||
opPrimaryTerm,
|
||||
globalCheckpoint,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
try (Releasable ignored = releasable) {
|
||||
assertThat(replica.getPendingPrimaryTerm(), greaterThanOrEqualTo(opPrimaryTerm));
|
||||
assertThat(replica.getOperationPrimaryTerm(), equalTo(opPrimaryTerm));
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
try {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}, TimeValue.timeValueMinutes(30L));
|
||||
});
|
||||
threads[j].start();
|
||||
}
|
||||
barrier.await();
|
||||
latch.await();
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
closeShard(replica, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Settings threadPoolSettings() {
|
||||
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
|
||||
|
|
Loading…
Reference in New Issue