Handle lower retaining seqno retention lease error (#46420)

We renew the CCR retention lease at a fixed interval, therefore it's
possible to have more than one in-flight renewal requests at the same
time. If requests arrive out of order, then the assertion is violated.

Closes #46416
Closes #46013
This commit is contained in:
Nhat Nguyen 2019-09-12 16:20:27 -04:00
parent 0bc8acaf5b
commit cabff5a7cd
7 changed files with 104 additions and 19 deletions

View File

@ -1027,7 +1027,12 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

View File

@ -383,22 +383,24 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
* @return the renewed retention lease
* @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
* @throws RetentionLeaseNotFoundException if the specified retention lease does not exist
* @throws RetentionLeaseInvalidRetainingSeqNoException if the new retaining sequence number is lower than
* the retaining sequence number of the current retention lease.
public synchronized RetentionLease renewRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
if (retentionLeases.contains(id) == false) {
final RetentionLease existingRetentionLease = retentionLeases.get(id);
if (existingRetentionLease == null) {
throw new RetentionLeaseNotFoundException(id);
if (retainingSequenceNumber < existingRetentionLease.retainingSequenceNumber()) {
assert PEER_RECOVERY_RETENTION_LEASE_SOURCE.equals(source) == false :
"renewing peer recovery retention lease [" + existingRetentionLease + "]" +
" with a lower retaining sequence number [" + retainingSequenceNumber + "]";
throw new RetentionLeaseInvalidRetainingSeqNoException(id, source, retainingSequenceNumber, existingRetentionLease);
final RetentionLease retentionLease =
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
final RetentionLease existingRetentionLease = retentionLeases.get(id);
assert existingRetentionLease != null;
assert existingRetentionLease.retainingSequenceNumber() <= retentionLease.retainingSequenceNumber() :
"retention lease renewal for [" + id + "]"
+ " from [" + source + "]"
+ " renewed a lower retaining sequence number [" + retentionLease.retainingSequenceNumber() + "]"
+ " than the current lease retaining sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]";
new RetentionLease(id, retainingSequenceNumber, currentTimeMillisSupplier.getAsLong(), source);
retentionLeases = new RetentionLeases(
retentionLeases.version() + 1,

View File

@ -0,0 +1,39 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.seqno;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
public class RetentionLeaseInvalidRetainingSeqNoException extends ElasticsearchException {
RetentionLeaseInvalidRetainingSeqNoException(String retentionLeaseId, String source, long retainingSequenceNumber,
RetentionLease existingRetentionLease) {
super("the current retention lease with [" + retentionLeaseId + "]" +
" is retaining a higher sequence number [" + existingRetentionLease.retainingSequenceNumber() + "]" +
" than the new retaining sequence number [" + retainingSequenceNumber + "] from [" + source + "]");
public RetentionLeaseInvalidRetainingSeqNoException(StreamInput in) throws IOException {

View File

@ -62,6 +62,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.IndexShardState;
@ -818,6 +819,7 @@ public class ExceptionSerializationTests extends ESTestCase {
ids.put(153, RetentionLeaseAlreadyExistsException.class);
ids.put(154, RetentionLeaseNotFoundException.class);
ids.put(155, ShardNotInPrimaryModeException.class);
ids.put(156, RetentionLeaseInvalidRetainingSeqNoException.class);
Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

View File

@ -719,6 +719,36 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
public void testRenewLeaseWithLowerRetainingSequenceNumber() throws Exception {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
value -> {},
() -> 0L,
(leases, listener) -> {},
routingTable(Collections.emptySet(), allocationId));
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomNonNegativeLong();
final String source = randomAlphaOfLength(8);
replicationTracker.addRetentionLease(id, retainingSequenceNumber, source, ActionListener.wrap(() -> {}));
final long lowerRetainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, retainingSequenceNumber - 1);
final RetentionLeaseInvalidRetainingSeqNoException e = expectThrows(RetentionLeaseInvalidRetainingSeqNoException.class,
() -> replicationTracker.renewRetentionLease(id, lowerRetainingSequenceNumber, source));
assertThat(e, hasToString(containsString("the current retention lease with [" + id + "]" +
" is retaining a higher sequence number [" + retainingSequenceNumber + "]" +
" than the new retaining sequence number [" + lowerRetainingSequenceNumber + "] from [" + source + "]")));
private void assertRetentionLeases(
final ReplicationTracker replicationTracker,
final int size,

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.shard.ShardId;
@ -470,11 +471,13 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor<ShardFollo
private void logRetentionLeaseFailure(final String retentionLeaseId, final Throwable cause) {
assert cause instanceof ElasticsearchSecurityException == false : cause;
logger.warn(new ParameterizedMessage(
"{} background management of retention lease [{}] failed while following",
if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) {
logger.warn(new ParameterizedMessage(
"{} background management of retention lease [{}] failed while following",

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.RetentionLeaseAlreadyExistsException;
import org.elasticsearch.index.seqno.RetentionLeaseInvalidRetainingSeqNoException;
import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
import org.elasticsearch.index.shard.ShardId;
@ -336,10 +337,13 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
r -> {},
e -> {
assert e instanceof ElasticsearchSecurityException == false : e;
logger.warn(new ParameterizedMessage(
"{} background renewal of retention lease [{}] failed during restore", shardId,
retentionLeaseId), e);
final Throwable cause = ExceptionsHelper.unwrapCause(e);
assert cause instanceof ElasticsearchSecurityException == false : cause;
if (cause instanceof RetentionLeaseInvalidRetainingSeqNoException == false) {
logger.warn(new ParameterizedMessage(
"{} background renewal of retention lease [{}] failed during restore", shardId,
retentionLeaseId), cause);