diff --git a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java index f95ba96d343..1cc92319b5e 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java @@ -35,6 +35,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.codec.CodecService; import org.elasticsearch.index.mapper.ParsedDocument; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.TranslogConfig; @@ -42,8 +43,11 @@ import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.threadpool.ThreadPool; +import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.function.LongSupplier; +import java.util.function.Supplier; /* * Holds all the configuration that is used to create an {@link Engine}. @@ -77,6 +81,18 @@ public final class EngineConfig { @Nullable private final CircuitBreakerService circuitBreakerService; private final LongSupplier globalCheckpointSupplier; + private final Supplier> retentionLeasesSupplier; + + /** + * A supplier of the outstanding retention leases. This is used during merged operations to determine which operations that have been + * soft deleted should be retained. + * + * @return a supplier of outstanding retention leases + */ + public Supplier> retentionLeasesSupplier() { + return retentionLeasesSupplier; + } + private final LongSupplier primaryTermSupplier; private final TombstoneDocSupplier tombstoneDocSupplier; @@ -125,7 +141,9 @@ public final class EngineConfig { List externalRefreshListener, List internalRefreshListener, Sort indexSort, CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier, - LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) { + Supplier> retentionLeasesSupplier, + LongSupplier primaryTermSupplier, + TombstoneDocSupplier tombstoneDocSupplier) { this.shardId = shardId; this.allocationId = allocationId; this.indexSettings = indexSettings; @@ -161,6 +179,7 @@ public final class EngineConfig { this.indexSort = indexSort; this.circuitBreakerService = circuitBreakerService; this.globalCheckpointSupplier = globalCheckpointSupplier; + this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.primaryTermSupplier = primaryTermSupplier; this.tombstoneDocSupplier = tombstoneDocSupplier; } diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index a295fbf3336..d0e55fc13ee 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -276,8 +276,11 @@ public class InternalEngine extends Engine { } else { lastMinRetainedSeqNo = Long.parseLong(commitUserData.get(SequenceNumbers.MAX_SEQ_NO)) + 1; } - return new SoftDeletesPolicy(translog::getLastSyncedGlobalCheckpoint, lastMinRetainedSeqNo, - engineConfig.getIndexSettings().getSoftDeleteRetentionOperations()); + return new SoftDeletesPolicy( + translog::getLastSyncedGlobalCheckpoint, + lastMinRetainedSeqNo, + engineConfig.getIndexSettings().getSoftDeleteRetentionOperations(), + engineConfig.retentionLeasesSupplier()); } /** diff --git a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java index af2ded8c466..c957902d8df 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java +++ b/server/src/main/java/org/elasticsearch/index/engine/SoftDeletesPolicy.java @@ -23,11 +23,15 @@ import org.apache.lucene.document.LongPoint; import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.translog.Translog; +import java.util.Collection; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.LongSupplier; +import java.util.function.Supplier; /** * A policy that controls how many soft-deleted documents should be retained for peer-recovery and querying history changes purpose. @@ -41,11 +45,18 @@ final class SoftDeletesPolicy { private long retentionOperations; // The min seq_no value that is retained - ops after this seq# should exist in the Lucene index. private long minRetainedSeqNo; + // provides the retention leases used to calculate the minimum sequence number to retain + private final Supplier> retentionLeasesSupplier; - SoftDeletesPolicy(LongSupplier globalCheckpointSupplier, long minRetainedSeqNo, long retentionOperations) { + SoftDeletesPolicy( + final LongSupplier globalCheckpointSupplier, + final long minRetainedSeqNo, + final long retentionOperations, + final Supplier> retentionLeasesSupplier) { this.globalCheckpointSupplier = globalCheckpointSupplier; this.retentionOperations = retentionOperations; this.minRetainedSeqNo = minRetainedSeqNo; + this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier); this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED; this.retentionLockCount = 0; } @@ -97,14 +108,35 @@ final class SoftDeletesPolicy { synchronized long getMinRetainedSeqNo() { // Do not advance if the retention lock is held if (retentionLockCount == 0) { - // This policy retains operations for two purposes: peer-recovery and querying changes history. - // - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit, - // then sends ops after the local checkpoint of that commit. This requires keeping all ops after localCheckpointOfSafeCommit; - // - Changes APIs are driven the combination of the global checkpoint and retention ops. Here we prefer using the global - // checkpoint instead of max_seqno because only operations up to the global checkpoint are exposed in the the changes APIs. - final long minSeqNoForQueryingChanges = globalCheckpointSupplier.getAsLong() - retentionOperations; + /* + * This policy retains operations for two purposes: peer-recovery and querying changes history. + * - Peer-recovery is driven by the local checkpoint of the safe commit. In peer-recovery, the primary transfers a safe commit, + * then sends operations after the local checkpoint of that commit. This requires keeping all ops after + * localCheckpointOfSafeCommit. + * - Changes APIs are driven by a combination of the global checkpoint, retention operations, and retention leases. Here we + * prefer using the global checkpoint instead of the maximum sequence number because only operations up to the global + * checkpoint are exposed in the the changes APIs. + */ + + // calculate the minimum sequence number to retain based on retention leases + final long minimumRetainingSequenceNumber = retentionLeasesSupplier + .get() + .stream() + .mapToLong(RetentionLease::retainingSequenceNumber) + .min() + .orElse(Long.MAX_VALUE); + /* + * The minimum sequence number to retain is the minimum of the minimum based on retention leases, and the number of operations + * below the global checkpoint to retain (index.soft_deletes.retention.operations). + */ + final long minSeqNoForQueryingChanges = + Math.min(globalCheckpointSupplier.getAsLong() - retentionOperations, minimumRetainingSequenceNumber); final long minSeqNoToRetain = Math.min(minSeqNoForQueryingChanges, localCheckpointOfSafeCommit) + 1; - // This can go backward as the retentionOperations value can be changed in settings. + + /* + * We take the maximum as minSeqNoToRetain can go backward as the retention operations value can be changed in settings, or from + * the addition of leases with a retaining sequence number lower than previous retaining sequence numbers. + */ minRetainedSeqNo = Math.max(minRetainedSeqNo, minSeqNoToRetain); } return minRetainedSeqNo; @@ -117,4 +149,5 @@ final class SoftDeletesPolicy { Query getRetentionQuery() { return LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE); } + } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index b406621e978..7168bb772dd 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -35,7 +35,9 @@ import org.elasticsearch.index.shard.ReplicationGroup; import org.elasticsearch.index.shard.ShardId; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -146,6 +148,29 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ volatile ReplicationGroup replicationGroup; + private final Map retentionLeases = new HashMap<>(); + + /** + * Get all retention leases tracker on this shard. An unmodifiable copy of the retention leases is returned. + * + * @return the retention leases + */ + public synchronized Collection getRetentionLeases() { + return Collections.unmodifiableCollection(new ArrayList<>(retentionLeases.values())); + } + + /** + * Adds a new or updates an existing retention lease. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + */ + public synchronized void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { + assert primaryMode; + retentionLeases.put(id, new RetentionLease(id, retainingSequenceNumber, source)); + } + public static class CheckpointState implements Writeable { /** diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java new file mode 100644 index 00000000000..68e73219dc8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLease.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +/** + * A "shard history retention lease" (or "retention lease" for short) is conceptually a marker containing a retaining sequence number such + * that all operations with sequence number at least that retaining sequence number will be retained during merge operations (which could + * otherwise merge away operations that have been soft deleted). Each retention lease contains a unique identifier, the retaining sequence + * number, and the source of the retention lease (e.g., "ccr"). + */ +public class RetentionLease { + + private final String id; + + /** + * The identifier for this retention lease. This identifier should be unique per lease and is set during construction by the caller. + * + * @return the identifier + */ + public String id() { + return id; + } + + private final long retainingSequenceNumber; + + /** + * The retaining sequence number of this retention lease. The retaining sequence number is the minimum sequence number that this + * retention lease wants to retain during merge operations. The retaining sequence number is set during construction by the caller. + * + * @return the retaining sequence number + */ + public long retainingSequenceNumber() { + return retainingSequenceNumber; + } + + private final String source; + + /** + * The source of this retention lease. The source is set during construction by the caller. + * + * @return the source + */ + public String source() { + return source; + } + + /** + * Constructs a new retention lease. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + */ + public RetentionLease(final String id, final long retainingSequenceNumber, final String source) { + this.id = id; + this.retainingSequenceNumber = retainingSequenceNumber; + this.source = source; + } + + @Override + public String toString() { + return "ShardHistoryRetentionLease{" + + "id='" + id + '\'' + + ", retainingSequenceNumber=" + retainingSequenceNumber + + ", source='" + source + '\'' + + '}'; + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 8b0db9543e3..ae4c3e0c0bc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1864,6 +1864,20 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.globalCheckpointListeners.add(waitingForGlobalCheckpoint, listener, timeout); } + + /** + * Adds a new or updates an existing retention lease. + * + * @param id the identifier of the retention lease + * @param retainingSequenceNumber the retaining sequence number + * @param source the source of the retention lease + */ + void addOrUpdateRetentionLease(final String id, final long retainingSequenceNumber, final String source) { + assert assertPrimaryMode(); + verifyNotClosed(); + replicationTracker.addOrUpdateRetentionLease(id, retainingSequenceNumber, source); + } + /** * Waits for all operations up to the provided sequence number to complete. * @@ -2310,13 +2324,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private EngineConfig newEngineConfig() { Sort indexSort = indexSortSupplier.get(); return new EngineConfig(shardId, shardRouting.allocationId().getId(), - threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), - mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, - indexCache.query(), cachingPolicy, translogConfig, - IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), - Collections.singletonList(refreshListeners), - Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier()); + threadPool, indexSettings, warmer, store, indexSettings.getMergePolicy(), + mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, + indexCache.query(), cachingPolicy, translogConfig, + IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), + Collections.singletonList(refreshListeners), + Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), + indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases, + () -> operationPrimaryTerm, tombstoneDocSupplier()); } /** diff --git a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java index 6546eaa1244..ff33ce19d48 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -52,7 +53,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testKeepCommitsAfterGlobalCheckpoint() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final int extraRetainedOps = between(0, 100); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); + final SoftDeletesPolicy softDeletesPolicy = + new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -96,7 +98,8 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testAcquireIndexCommit() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); final int extraRetainedOps = between(0, 100); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps); + final SoftDeletesPolicy softDeletesPolicy = + new SoftDeletesPolicy(globalCheckpoint::get, -1, extraRetainedOps, Collections::emptyList); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -176,7 +179,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testLegacyIndex() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); final UUID translogUUID = UUID.randomUUID(); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); @@ -211,7 +214,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testDeleteInvalidCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(randomNonNegativeLong()); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); @@ -245,7 +248,7 @@ public class CombinedDeletionPolicyTests extends ESTestCase { public void testCheckUnreferencedCommits() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); - final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0); + final SoftDeletesPolicy softDeletesPolicy = new SoftDeletesPolicy(globalCheckpoint::get, -1, 0, Collections::emptyList); final UUID translogUUID = UUID.randomUUID(); final TranslogDeletionPolicy translogPolicy = createTranslogDeletionPolicy(); CombinedDeletionPolicy indexPolicy = new CombinedDeletionPolicy(logger, translogPolicy, softDeletesPolicy, globalCheckpoint::get); diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 31995f1f7f2..82e9869a1cd 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -3011,7 +3011,8 @@ public class InternalEngineTests extends EngineTestCase { new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, - new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier()); + new NoneCircuitBreakerService(), () -> UNASSIGNED_SEQ_NO, Collections::emptySet, primaryTerm::get, + tombstoneDocSupplier()); expectThrows(EngineCreationFailureException.class, () -> new InternalEngine(brokenConfig)); engine = createEngine(store, primaryTranslogDir); // and recover again! diff --git a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java index f3590100382..17ad6750cf5 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/SoftDeletesPolicyTests.java @@ -19,25 +19,48 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.document.LongPoint; +import org.apache.lucene.search.PointRangeQuery; +import org.apache.lucene.search.Query; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; public class SoftDeletesPolicyTests extends ESTestCase { + /** * Makes sure we won't advance the retained seq# if the retention lock is held */ public void testSoftDeletesRetentionLock() { long retainedOps = between(0, 10000); AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); + final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)]; + for (int i = 0; i < retainingSequenceNumbers.length; i++) { + retainingSequenceNumbers[i] = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO); + } + final Supplier> retentionLeasesSupplier = + () -> { + final Set leases = new HashSet<>(retainingSequenceNumbers.length); + for (int i = 0; i < retainingSequenceNumbers.length; i++) { + leases.add(new RetentionLease(Integer.toString(i), retainingSequenceNumbers[i].get(), "test")); + } + return leases; + }; long safeCommitCheckpoint = globalCheckpoint.get(); - SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps); + SoftDeletesPolicy policy = new SoftDeletesPolicy(globalCheckpoint::get, between(1, 10000), retainedOps, retentionLeasesSupplier); long minRetainedSeqNo = policy.getMinRetainedSeqNo(); List locks = new ArrayList<>(); int iters = scaledRandomIntBetween(10, 1000); @@ -47,6 +70,9 @@ public class SoftDeletesPolicyTests extends ESTestCase { } // Advances the global checkpoint and the local checkpoint of a safe commit globalCheckpoint.addAndGet(between(0, 1000)); + for (final AtomicLong retainingSequenceNumber : retainingSequenceNumbers) { + retainingSequenceNumber.set(randomLongBetween(retainingSequenceNumber.get(), globalCheckpoint.get())); + } safeCommitCheckpoint = randomLongBetween(safeCommitCheckpoint, globalCheckpoint.get()); policy.setLocalCheckpointOfSafeCommit(safeCommitCheckpoint); if (rarely()) { @@ -58,18 +84,36 @@ public class SoftDeletesPolicyTests extends ESTestCase { locks.removeAll(releasingLocks); releasingLocks.forEach(Releasable::close); - // We only expose the seqno to the merge policy if the retention lock is not held. - policy.getRetentionQuery(); + // getting the query has side effects, updating the internal state of the policy + final Query query = policy.getRetentionQuery(); + assertThat(query, instanceOf(PointRangeQuery.class)); + final PointRangeQuery retentionQuery = (PointRangeQuery) query; + + // we only expose the minimum sequence number to the merge policy if the retention lock is not held if (locks.isEmpty()) { - long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; + final long minimumRetainingSequenceNumber = Arrays.stream(retainingSequenceNumbers) + .mapToLong(AtomicLong::get) + .min() + .orElse(Long.MAX_VALUE); + long retainedSeqNo = + Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); } + assertThat(retentionQuery.getNumDims(), equalTo(1)); + assertThat(LongPoint.decodeDimension(retentionQuery.getLowerPoint(), 0), equalTo(minRetainedSeqNo)); + assertThat(LongPoint.decodeDimension(retentionQuery.getUpperPoint(), 0), equalTo(Long.MAX_VALUE)); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } locks.forEach(Releasable::close); - long retainedSeqNo = Math.min(safeCommitCheckpoint, globalCheckpoint.get() - retainedOps) + 1; + final long minimumRetainingSequenceNumber = Arrays.stream(retainingSequenceNumbers) + .mapToLong(AtomicLong::get) + .min() + .orElse(Long.MAX_VALUE); + long retainedSeqNo = + Math.min(safeCommitCheckpoint, Math.min(minimumRetainingSequenceNumber, globalCheckpoint.get() - retainedOps)) + 1; minRetainedSeqNo = Math.max(minRetainedSeqNo, retainedSeqNo); assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo)); } + } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java new file mode 100644 index 00000000000..83dbb3194ae --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.IndexSettingsModule; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; + +public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase { + + public void testAddOrUpdateRetentionLease() { + final AllocationId id = AllocationId.newInitializing(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + id.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + UNASSIGNED_SEQ_NO, + value -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(id.getId()), + routingTable(Collections.emptySet(), id), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers); + } + + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); + replicationTracker.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers); + } + + } + + private void assertRetentionLeases( + final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers) { + final Collection retentionLeases = replicationTracker.getRetentionLeases(); + final Map idToRetentionLease = new HashMap<>(); + for (final RetentionLease retentionLease : retentionLeases) { + idToRetentionLease.put(retentionLease.id(), retentionLease); + } + + assertThat(idToRetentionLease.entrySet(), hasSize(size)); + for (int i = 0; i < size; i++) { + assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); + final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); + assertThat(retentionLease.source(), equalTo("test-" + i)); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java new file mode 100644 index 00000000000..62e43af7d0e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.seqno; + +import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; + +import java.util.Set; +import java.util.function.LongConsumer; + +import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; + +public abstract class ReplicationTrackerTestCase extends ESTestCase { + + ReplicationTracker newTracker(final AllocationId allocationId, final LongConsumer updatedGlobalCheckpoint) { + return new ReplicationTracker( + new ShardId("test", "_na_", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + UNASSIGNED_SEQ_NO, + updatedGlobalCheckpoint); + } + + static IndexShardRoutingTable routingTable(final Set initializingIds, final AllocationId primaryId) { + final ShardId shardId = new ShardId("test", "_na_", 0); + final ShardRouting primaryShard = + TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId); + return routingTable(initializingIds, primaryShard); + } + + static IndexShardRoutingTable routingTable(final Set initializingIds, final ShardRouting primaryShard) { + assert !initializingIds.contains(primaryShard.allocationId()); + final ShardId shardId = new ShardId("test", "_na_", 0); + final IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); + for (final AllocationId initializingId : initializingIds) { + builder.addShard(TestShardRouting.newShardRouting( + shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId)); + } + + builder.addShard(primaryShard); + + return builder.build(); + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index 0aed64d05fc..e3fad3182a7 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -32,7 +32,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; @@ -61,7 +60,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; -public class ReplicationTrackerTests extends ESTestCase { +public class ReplicationTrackerTests extends ReplicationTrackerTestCase { public void testEmptyShards() { final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); @@ -76,27 +75,6 @@ public class ReplicationTrackerTests extends ESTestCase { return allocations; } - private static IndexShardRoutingTable routingTable(final Set initializingIds, final AllocationId primaryId) { - final ShardId shardId = new ShardId("test", "_na_", 0); - final ShardRouting primaryShard = - TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(10), null, true, ShardRoutingState.STARTED, primaryId); - return routingTable(initializingIds, primaryShard); - } - - private static IndexShardRoutingTable routingTable(final Set initializingIds, final ShardRouting primaryShard) { - assert !initializingIds.contains(primaryShard.allocationId()); - ShardId shardId = new ShardId("test", "_na_", 0); - IndexShardRoutingTable.Builder builder = new IndexShardRoutingTable.Builder(shardId); - for (AllocationId initializingId : initializingIds) { - builder.addShard(TestShardRouting.newShardRouting( - shardId, randomAlphaOfLength(10), null, false, ShardRoutingState.INITIALIZING, initializingId)); - } - - builder.addShard(primaryShard); - - return builder.build(); - } - private static Set ids(Set allocationIds) { return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); } @@ -428,12 +406,7 @@ public class ReplicationTrackerTests extends ESTestCase { private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); private ReplicationTracker newTracker(final AllocationId allocationId) { - return new ReplicationTracker( - new ShardId("test", "_na_", 0), - allocationId.getId(), - IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO, - updatedGlobalCheckpoint::set); + return newTracker(allocationId, updatedGlobalCheckpoint::set); } public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { @@ -709,10 +682,11 @@ public class ReplicationTrackerTests extends ESTestCase { FakeClusterState clusterState = initialState(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; + final long globalCheckpoint = UNASSIGNED_SEQ_NO; ReplicationTracker oldPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); + new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate); ReplicationTracker newPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); + new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java new file mode 100644 index 00000000000..1214ac0ec9f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -0,0 +1,75 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.shard; + +import org.elasticsearch.index.seqno.RetentionLease; +import org.elasticsearch.index.seqno.SequenceNumbers; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; + +public class IndexShardRetentionLeaseTests extends IndexShardTestCase { + + public void testAddOrUpdateRetentionLease() throws IOException { + final IndexShard indexShard = newStartedShard(true); + try { + final int length = randomIntBetween(0, 8); + final long[] minimumRetainingSequenceNumbers = new long[length]; + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers); + } + + for (int i = 0; i < length; i++) { + minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); + indexShard.addOrUpdateRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); + assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers); + } + } finally { + closeShards(indexShard); + } + + } + + private void assertRetentionLeases( + final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers) { + final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); + final Map idToRetentionLease = new HashMap<>(); + for (final RetentionLease retentionLease : retentionLeases) { + idToRetentionLease.put(retentionLease.id(), retentionLease); + } + + assertThat(idToRetentionLease.entrySet(), hasSize(size)); + for (int i = 0; i < size; i++) { + assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); + final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); + assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); + assertThat(retentionLease.source(), equalTo("test-" + i)); + } + } + +} diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index cb061e3d5af..b52a3dbf17c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -125,8 +125,8 @@ public class RefreshListenersTests extends ESTestCase { indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null, - new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm, - EngineTestCase.tombstoneDocSupplier()); + new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, Collections::emptySet, + () -> primaryTerm, EngineTestCase.tombstoneDocSupplier()); engine = new InternalEngine(config); engine.initializeMaxSeqNoOfUpdatesOrDeletes(); engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f07c5e1b21f..3786cc9591d 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -84,6 +84,7 @@ import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.ReplicationTracker; +import org.elasticsearch.index.seqno.RetentionLease; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -104,6 +105,7 @@ import java.nio.charset.Charset; import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -115,6 +117,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.LongSupplier; +import java.util.function.Supplier; import java.util.function.ToLongBiFunction; import java.util.stream.Collectors; @@ -224,7 +227,8 @@ public abstract class EngineTestCase extends ESTestCase { new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), - config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), tombstoneDocSupplier()); + config.getCircuitBreakerService(), globalCheckpointSupplier, config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), tombstoneDocSupplier()); } public EngineConfig copy(EngineConfig config, Analyzer analyzer) { @@ -233,8 +237,8 @@ public abstract class EngineTestCase extends ESTestCase { new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), - config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier()); + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } public EngineConfig copy(EngineConfig config, MergePolicy mergePolicy) { @@ -243,8 +247,8 @@ public abstract class EngineTestCase extends ESTestCase { new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), - config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(), - config.getTombstoneDocSupplier()); + config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(), + config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier()); } @Override @@ -581,7 +585,8 @@ public abstract class EngineTestCase extends ESTestCase { public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, ReferenceManager.RefreshListener externalRefreshListener, ReferenceManager.RefreshListener internalRefreshListener, - Sort indexSort, LongSupplier globalCheckpointSupplier, CircuitBreakerService breakerService) { + Sort indexSort, @Nullable final LongSupplier maybeGlobalCheckpointSupplier, + CircuitBreakerService breakerService) { IndexWriterConfig iwc = newIndexWriterConfig(); TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE); Engine.EventListener listener = new Engine.EventListener() { @@ -594,14 +599,22 @@ public abstract class EngineTestCase extends ESTestCase { externalRefreshListener == null ? emptyList() : Collections.singletonList(externalRefreshListener); final List intRefreshListenerList = internalRefreshListener == null ? emptyList() : Collections.singletonList(internalRefreshListener); + final LongSupplier globalCheckpointSupplier; + final Supplier> retentionLeasesSupplier; + if (maybeGlobalCheckpointSupplier == null) { + final ReplicationTracker replicationTracker = + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}); + globalCheckpointSupplier = replicationTracker; + retentionLeasesSupplier = replicationTracker::getRetentionLeases; + } else { + globalCheckpointSupplier = maybeGlobalCheckpointSupplier; + retentionLeasesSupplier = Collections::emptySet; + } EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store, mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), extRefreshListenerList, intRefreshListenerList, indexSort, - breakerService, - globalCheckpointSupplier == null ? - new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : - globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier()); + breakerService, globalCheckpointSupplier, retentionLeasesSupplier, primaryTerm::get, tombstoneDocSupplier()); return config; } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java index 99998342b11..bccc5fed836 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java @@ -270,9 +270,9 @@ public class FollowingEngineTests extends ESTestCase { null, new NoneCircuitBreakerService(), globalCheckpoint::longValue, + Collections::emptyList, () -> primaryTerm.get(), - EngineTestCase.tombstoneDocSupplier() - ); + EngineTestCase.tombstoneDocSupplier()); } private static Store createStore(