Introduce shard history retention leases (#37167)

This commit is the first in a series which will culminate with
fully-functional shard history retention leases.

Shard history retention leases are aimed at preventing shard history
consumers from having to fallback to expensive file copy operations if
shard history is not available from a certain point. These consumers
include following indices in cross-cluster replication, and local shard
recoveries. A future consumer will be the changes API.

Further, index lifecycle management requires coordinating with some of
these consumers otherwise it could remove the source before all
consumers have finished reading all operations. The notion of shard
history retention leases that we are introducing here will also be used
to address this problem.

Shard history retention leases are a property of the replication group
managed under the authority of the primary. A shard history retention
lease is a combination of an identifier, a retaining sequence number, a
timestamp indicating when the lease was acquired or renewed, and a
string indicating the source of the lease. Being leases they have a
limited lifespan that will expire if not renewed. The idea of these
leases is that all operations above the minimum of all retaining
sequence numbers will be retained during merges (which would otherwise
clear away operations that are soft deleted). These leases will be
periodically persisted to Lucene and restored during recovery, and
broadcast to replicas under certain circumstances.

This commit is merely putting the basics in place. This first commit
only introduces the concept and integrates their use with the soft
delete retention policy. We add some tests to demonstrate the basic
management is correct, and that the soft delete policy is correctly
influenced by the existence of any retention leases. We make no effort
in this commit to implement any of the following:
 - timestamps
 - expiration
 - persistence to and recovery from Lucene
 - handoff during primary relocation
 - sharing retention leases with replicas
 - exposing leases in shard-level statistics
 - integration with cross-cluster replication

These will occur individually in follow-up commits.
This commit is contained in:
Jason Tedor 2019-01-07 07:43:57 -08:00 committed by GitHub
parent a7c3d5842a
commit c0f8c89172
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 520 additions and 74 deletions

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.TranslogConfig; import org.elasticsearch.index.translog.TranslogConfig;
@ -42,8 +43,11 @@ import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Supplier;
/* /*
* Holds all the configuration that is used to create an {@link Engine}. * Holds all the configuration that is used to create an {@link Engine}.
@ -77,6 +81,18 @@ public final class EngineConfig {
@Nullable @Nullable
private final CircuitBreakerService circuitBreakerService; private final CircuitBreakerService circuitBreakerService;
private final LongSupplier globalCheckpointSupplier; private final LongSupplier globalCheckpointSupplier;
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
/**
* A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been
* soft deleted should be retained.
*
* @return a supplier of outstanding retention leases
*/
public Supplier<Collection<RetentionLease>> retentionLeasesSupplier() {
return retentionLeasesSupplier;
}
private final LongSupplier primaryTermSupplier; private final LongSupplier primaryTermSupplier;
private final TombstoneDocSupplier tombstoneDocSupplier; private final TombstoneDocSupplier tombstoneDocSupplier;
@ -125,7 +141,9 @@ public final class EngineConfig {
List<ReferenceManager.RefreshListener> externalRefreshListener, List<ReferenceManager.RefreshListener> externalRefreshListener,
List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort, List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) { Supplier<Collection<RetentionLease>> retentionLeasesSupplier,
LongSupplier primaryTermSupplier,
TombstoneDocSupplier tombstoneDocSupplier) {
this.shardId = shardId; this.shardId = shardId;
this.allocationId = allocationId; this.allocationId = allocationId;
this.indexSettings = indexSettings; this.indexSettings = indexSettings;
@ -161,6 +179,7 @@ public final class EngineConfig {
this.indexSort = indexSort; this.indexSort = indexSort;
this.circuitBreakerService = circuitBreakerService; this.circuitBreakerService = circuitBreakerService;
this.globalCheckpointSupplier = globalCheckpointSupplier; this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.primaryTermSupplier = primaryTermSupplier; this.primaryTermSupplier = primaryTermSupplier;
this.tombstoneDocSupplier = tombstoneDocSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier;
} }

View File

@ -276,8 +276,11 @@ public class InternalEngine extends Engine {
} else { } else {
lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1;
} }
return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, return new SoftDeletesPolicy(
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations()); translog::getLastSyncedGlobalCheckpoint,
lastMinRetainedSeqNo,
engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(),
engineConfig.retentionLeasesSupplier());
} }
/** /**

View File

@ -23,11 +23,15 @@ import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.Query; import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Supplier;
/** /**
* A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose. * A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose.
@ -41,11 +45,18 @@ final class SoftDeletesPolicy {
private long retentionOperations; private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo; private long minRetainedSeqNo;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) { SoftDeletesPolicy(
final LongSupplier globalCheckpointSupplier,
final long minRetainedSeqNo,
final long retentionOperations,
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier) {
this.globalCheckpointSupplier = globalCheckpointSupplier; this.globalCheckpointSupplier = globalCheckpointSupplier;
this.retentionOperations = retentionOperations; this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo; this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0; this.retentionLockCount = 0;
} }
@ -97,14 +108,35 @@ final class SoftDeletesPolicy {
synchronized long getMinRetainedSeqNo() { synchronized long getMinRetainedSeqNo() {
// Do not advance if the retention lock is held // Do not advance if the retention lock is held
if (retentionLockCount == 0) { if (retentionLockCount == 0) {
// This policy retains operations for two purposes: peer-recovery and querying changes history. /*
// - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit, * This policy retains operations for two purposes: peer-recovery and querying changes history.
// then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit; * - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit,
// - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global * then sends operations after the local checkpoint of that commit. This requires keeping all ops after
// checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs. * localCheckpointOfSafeCommit.
final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations; * - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we
* prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global
* checkpoint are exposed in the the changes APIs.
*/
// calculate the minimum sequence number to retain based on retention leases
final long minimumRetainingSequenceNumber = retentionLeasesSupplier
.get()
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)
.min()
.orElse(Long.MAX_VALUE);
/*
* The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations
* below the global checkpoint to retain (index.soft_deletes.retention.operations).
*/
final long minSeqNoForQueryingChanges =
Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber);
final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1; final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1;
// This can go backward as the retentionOperations value can be changed in settings.
/*
* We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from
* the addition of leases with a retaining sequence number lower than previous retaining sequence numbers.
*/
minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain);
} }
return minRetainedSeqNo; return minRetainedSeqNo;
@ -117,4 +149,5 @@ final class SoftDeletesPolicy {
Query getRetentionQuery() { Query getRetentionQuery() {
return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE); return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE);
} }
} }

View File

@ -35,7 +35,9 @@ import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
@ -146,6 +148,29 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/ */
volatile ReplicationGroup replicationGroup; volatile ReplicationGroup replicationGroup;
private final Map<String, RetentionLease> retentionLeases = new HashMap<>();
/**
* Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values()));
}
/**
* Adds a new or updates an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert primaryMode;
retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source));
}
public static class CheckpointState implements Writeable { public static class CheckpointState implements Writeable {
/** /**

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
/**
* A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such
* that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could
* otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence
* number, and the source of the retention lease (e.g., "ccr").
*/
public class RetentionLease {
private final String id;
/**
* The identifier for this retention lease. This identifier should be unique per lease and is set during construction by the caller.
*
* @return the identifier
*/
public String id() {
return id;
}
private final long retainingSequenceNumber;
/**
* The retaining sequence number of this retention lease. The retaining sequence number is the minimum sequence number that this
* retention lease wants to retain during merge operations. The retaining sequence number is set during construction by the caller.
*
* @return the retaining sequence number
*/
public long retainingSequenceNumber() {
return retainingSequenceNumber;
}
private final String source;
/**
* The source of this retention lease. The source is set during construction by the caller.
*
* @return the source
*/
public String source() {
return source;
}
/**
* Constructs a new retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
public RetentionLease(final String id, final long retainingSequenceNumber, final String source) {
this.id = id;
this.retainingSequenceNumber = retainingSequenceNumber;
this.source = source;
}
@Override
public String toString() {
return "ShardHistoryRetentionLease{" +
"id='" + id + '\'' +
", retainingSequenceNumber=" + retainingSequenceNumber +
", source='" + source + '\'' +
'}';
}
}

View File

@ -1864,6 +1864,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout); this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout);
} }
/**
* Adds a new or updates an existing retention lease.
*
* @param id the identifier of the retention lease
* @param retainingSequenceNumber the retaining sequence number
* @param source the source of the retention lease
*/
void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) {
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source);
}
/** /**
* Waits for all operations up to the provided sequence number to complete. * Waits for all operations up to the provided sequence number to complete.
* *
@ -2310,13 +2324,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private EngineConfig newEngineConfig() { private EngineConfig newEngineConfig() {
Sort indexSort = indexSortSupplier.get(); Sort indexSort = indexSortSupplier.get();
return new EngineConfig(shardId, shardRouting.allocationId().getId(), return new EngineConfig(shardId, shardRouting.allocationId().getId(),
threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener,
indexCache.query(), cachingPolicy, translogConfig, indexCache.query(), cachingPolicy, translogConfig,
IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
Collections.singletonList(refreshListeners), Collections.singletonList(refreshListeners),
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier()); indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
() -> operationPrimaryTerm, tombstoneDocSupplier());
} }
/** /**

View File

@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -52,7 +53,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(); final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100); final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); final SoftDeletesPolicy softDeletesPolicy =
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
@ -96,7 +98,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testAcquireIndexCommit() throws Exception { public void testAcquireIndexCommit() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(); final AtomicLong globalCheckpoint = new AtomicLong();
final int extraRetainedOps = between(0, 100); final int extraRetainedOps = between(0, 100);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); final SoftDeletesPolicy softDeletesPolicy =
new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID(); final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
@ -176,7 +179,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testLegacyIndex() throws Exception { public void testLegacyIndex() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(); final AtomicLong globalCheckpoint = new AtomicLong();
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID(); final UUID translogUUID = UUID.randomUUID();
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
@ -211,7 +214,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testDeleteInvalidCommits() throws Exception { public void testDeleteInvalidCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong());
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);
@ -245,7 +248,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase {
public void testCheckUnreferencedCommits() throws Exception { public void testCheckUnreferencedCommits() throws Exception {
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList);
final UUID translogUUID = UUID.randomUUID(); final UUID translogUUID = UUID.randomUUID();
final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy();
CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get);

View File

@ -3011,7 +3011,8 @@ public class InternalEngineTests extends EngineTestCase {
new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getExternalRefreshListener(), config.getInternalRefreshListener(), null,
new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier()); new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, Collections::emptySet, primaryTerm::get,
tombstoneDocSupplier());
expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig));
engine = createEngine(store, primaryTranslogDir); // and recover again! engine = createEngine(store, primaryTranslogDir); // and recover again!

View File

@ -19,25 +19,48 @@
package org.elasticsearch.index.engine; package org.elasticsearch.index.engine;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
public class SoftDeletesPolicyTests extends ESTestCase { public class SoftDeletesPolicyTests extends ESTestCase {
/** /**
* Makes sure we won't advance the retained seq# if the retention lock is held * Makes sure we won't advance the retained seq# if the retention lock is held
*/ */
public void testSoftDeletesRetentionLock() { public void testSoftDeletesRetentionLock() {
long retainedOps = between(0, 10000); long retainedOps = between(0, 10000);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)];
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
retainingSequenceNumbers[i] = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
}
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier =
() -> {
final Set<RetentionLease> leases = new HashSet<>(retainingSequenceNumbers.length);
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), "test"));
}
return leases;
};
long safeCommitCheckpoint = globalCheckpoint.get(); long safeCommitCheckpoint = globalCheckpoint.get();
SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps); SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps, retentionLeasesSupplier);
long minRetainedSeqNo = policy.getMinRetainedSeqNo(); long minRetainedSeqNo = policy.getMinRetainedSeqNo();
List<Releasable> locks = new ArrayList<>(); List<Releasable> locks = new ArrayList<>();
int iters = scaledRandomIntBetween(10, 1000); int iters = scaledRandomIntBetween(10, 1000);
@ -47,6 +70,9 @@ public class SoftDeletesPolicyTests extends ESTestCase {
} }
// Advances the global checkpoint and the local checkpoint of a safe commit // Advances the global checkpoint and the local checkpoint of a safe commit
globalCheckpoint.addAndGet(between(0, 1000)); globalCheckpoint.addAndGet(between(0, 1000));
for (final AtomicLong retainingSequenceNumber : retainingSequenceNumbers) {
retainingSequenceNumber.set(randomLongBetween(retainingSequenceNumber.get(), globalCheckpoint.get()));
}
safeCommitCheckpoint = randomLongBetween(safeCommitCheckpoint, globalCheckpoint.get()); safeCommitCheckpoint = randomLongBetween(safeCommitCheckpoint, globalCheckpoint.get());
policy.setLocalCheckpointOfSafeCommit(safeCommitCheckpoint); policy.setLocalCheckpointOfSafeCommit(safeCommitCheckpoint);
if (rarely()) { if (rarely()) {
@ -58,18 +84,36 @@ public class SoftDeletesPolicyTests extends ESTestCase {
locks.removeAll(releasingLocks); locks.removeAll(releasingLocks);
releasingLocks.forEach(Releasable::close); releasingLocks.forEach(Releasable::close);
// We only expose the seqno to the merge policy if the retention lock is not held. // getting the query has side effects, updating the internal state of the policy
policy.getRetentionQuery(); final Query query = policy.getRetentionQuery();
assertThat(query, instanceOf(PointRangeQuery.class));
final PointRangeQuery retentionQuery = (PointRangeQuery) query;
// we only expose the minimum sequence number to the merge policy if the retention lock is not held
if (locks.isEmpty()) { if (locks.isEmpty()) {
long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; final long minimumRetainingSequenceNumber = Arrays.stream(retainingSequenceNumbers)
.mapToLong(AtomicLong::get)
.min()
.orElse(Long.MAX_VALUE);
long retainedSeqNo =
Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
} }
assertThat(retentionQuery.getNumDims(), equalTo(1));
assertThat(LongPoint.decodeDimension(retentionQuery.getLowerPoint(), 0), equalTo(minRetainedSeqNo));
assertThat(LongPoint.decodeDimension(retentionQuery.getUpperPoint(), 0), equalTo(Long.MAX_VALUE));
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
} }
locks.forEach(Releasable::close); locks.forEach(Releasable::close);
long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; final long minimumRetainingSequenceNumber = Arrays.stream(retainingSequenceNumbers)
.mapToLong(AtomicLong::get)
.min()
.orElse(Long.MAX_VALUE);
long retainedSeqNo =
Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1;
minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo);
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
} }
} }

View File

@ -0,0 +1,86 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.IndexSettingsModule;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase {
public void testAddOrUpdateRetentionLease() {
final AllocationId id = AllocationId.newInitializing();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
id.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO,
value -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(id.getId()),
routingTable(Collections.emptySet(), id),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers);
}
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers);
}
}
private void assertRetentionLeases(
final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers) {
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
idToRetentionLease.put(retentionLease.id(), retentionLease);
}
assertThat(idToRetentionLease.entrySet(), hasSize(size));
for (int i = 0; i < size; i++) {
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.seqno;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import java.util.Set;
import java.util.function.LongConsumer;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
public abstract class ReplicationTrackerTestCase extends ESTestCase {
ReplicationTracker newTracker(final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint) {
return new ReplicationTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint);
}
static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {
final ShardId shardId = new ShardId("test", "_na_", 0);
final ShardRouting primaryShard =
TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId);
return routingTable(initializingIds, primaryShard);
}
static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final ShardRouting primaryShard) {
assert !initializingIds.contains(primaryShard.allocationId());
final ShardId shardId = new ShardId("test", "_na_", 0);
final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
for (final AllocationId initializingId : initializingIds) {
builder.addShard(TestShardRouting.newShardRouting(
shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId));
}
builder.addShard(primaryShard);
return builder.build();
}
}

View File

@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import java.io.IOException; import java.io.IOException;
@ -61,7 +60,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
public class ReplicationTrackerTests extends ESTestCase { public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
public void testEmptyShards() { public void testEmptyShards() {
final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); final ReplicationTracker tracker = newTracker(AllocationId.newInitializing());
@ -76,27 +75,6 @@ public class ReplicationTrackerTests extends ESTestCase {
return allocations; return allocations;
} }
private static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final AllocationId primaryId) {
final ShardId shardId = new ShardId("test", "_na_", 0);
final ShardRouting primaryShard =
TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId);
return routingTable(initializingIds, primaryShard);
}
private static IndexShardRoutingTable routingTable(final Set<AllocationId> initializingIds, final ShardRouting primaryShard) {
assert !initializingIds.contains(primaryShard.allocationId());
ShardId shardId = new ShardId("test", "_na_", 0);
IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId);
for (AllocationId initializingId : initializingIds) {
builder.addShard(TestShardRouting.newShardRouting(
shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId));
}
builder.addShard(primaryShard);
return builder.build();
}
private static Set<String> ids(Set<AllocationId> allocationIds) { private static Set<String> ids(Set<AllocationId> allocationIds) {
return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet());
} }
@ -428,12 +406,7 @@ public class ReplicationTrackerTests extends ESTestCase {
private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);
private ReplicationTracker newTracker(final AllocationId allocationId) { private ReplicationTracker newTracker(final AllocationId allocationId) {
return new ReplicationTracker( return newTracker(allocationId, updatedGlobalCheckpoint::set);
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint::set);
} }
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
@ -709,10 +682,11 @@ public class ReplicationTrackerTests extends ESTestCase {
FakeClusterState clusterState = initialState(); FakeClusterState clusterState = initialState();
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
ReplicationTracker oldPrimary = ReplicationTracker oldPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate);
ReplicationTracker newPrimary = ReplicationTracker newPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate);
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));

View File

@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
public void testAddOrUpdateRetentionLease() throws IOException {
final IndexShard indexShard = newStartedShard(true);
try {
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers);
}
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers);
}
} finally {
closeShards(indexShard);
}
}
private void assertRetentionLeases(
final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers) {
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
idToRetentionLease.put(retentionLease.id(), retentionLease);
}
assertThat(idToRetentionLease.entrySet(), hasSize(size));
for (int i = 0; i < size; i++) {
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}
}

View File

@ -125,8 +125,8 @@ public class RefreshListenersTests extends ESTestCase {
indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, Collections::emptySet,
EngineTestCase.tombstoneDocSupplier()); () -> primaryTerm, EngineTestCase.tombstoneDocSupplier());
engine = new InternalEngine(config); engine = new InternalEngine(config);
engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.initializeMaxSeqNoOfUpdatesOrDeletes();
engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);

View File

@ -84,6 +84,7 @@ import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
@ -104,6 +105,7 @@ import java.nio.charset.Charset;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -115,6 +117,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.function.ToLongBiFunction; import java.util.function.ToLongBiFunction;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -224,7 +227,8 @@ public abstract class EngineTestCase extends ESTestCase {
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), tombstoneDocSupplier()); config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), tombstoneDocSupplier());
} }
public EngineConfig copy(EngineConfig config, Analyzer analyzer) { public EngineConfig copy(EngineConfig config, Analyzer analyzer) {
@ -233,8 +237,8 @@ public abstract class EngineTestCase extends ESTestCase {
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getTombstoneDocSupplier()); config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
} }
public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) {
@ -243,8 +247,8 @@ public abstract class EngineTestCase extends ESTestCase {
new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
config.getTranslogConfig(), config.getFlushMergesAfter(), config.getTranslogConfig(), config.getFlushMergesAfter(),
config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getTombstoneDocSupplier()); config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
} }
@Override @Override
@ -581,7 +585,8 @@ public abstract class EngineTestCase extends ESTestCase {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
ReferenceManager.RefreshListener externalRefreshListener, ReferenceManager.RefreshListener externalRefreshListener,
ReferenceManager.RefreshListener internalRefreshListener, ReferenceManager.RefreshListener internalRefreshListener,
Sort indexSort, LongSupplier globalCheckpointSupplier, CircuitBreakerService breakerService) { Sort indexSort, @Nullable final LongSupplier maybeGlobalCheckpointSupplier,
CircuitBreakerService breakerService) {
IndexWriterConfig iwc = newIndexWriterConfig(); IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
Engine.EventListener listener = new Engine.EventListener() { Engine.EventListener listener = new Engine.EventListener() {
@ -594,14 +599,22 @@ public abstract class EngineTestCase extends ESTestCase {
externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener);
final List<ReferenceManager.RefreshListener> intRefreshListenerList = final List<ReferenceManager.RefreshListener> intRefreshListenerList =
internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener);
final LongSupplier globalCheckpointSupplier;
final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
if (maybeGlobalCheckpointSupplier == null) {
final ReplicationTracker replicationTracker =
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {});
globalCheckpointSupplier = replicationTracker;
retentionLeasesSupplier = replicationTracker::getRetentionLeases;
} else {
globalCheckpointSupplier = maybeGlobalCheckpointSupplier;
retentionLeasesSupplier = Collections::emptySet;
}
EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort,
breakerService, breakerService, globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm::get, tombstoneDocSupplier());
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());
return config; return config;
} }

View File

@ -270,9 +270,9 @@ public class FollowingEngineTests extends ESTestCase {
null, null,
new NoneCircuitBreakerService(), new NoneCircuitBreakerService(),
globalCheckpoint::longValue, globalCheckpoint::longValue,
Collections::emptyList,
() -> primaryTerm.get(), () -> primaryTerm.get(),
EngineTestCase.tombstoneDocSupplier() EngineTestCase.tombstoneDocSupplier());
);
} }
private static Store createStore( private static Store createStore(