diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 57d8cc0b326..1b1784495e6 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -121,7 +121,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private volatile AsyncRefreshTask refreshTask; private volatile AsyncTranslogFSync fsyncTask; private volatile AsyncGlobalCheckpointTask globalCheckpointTask; - private volatile AsyncRetentionLeaseBackgroundSyncTask retentionLeaseBackgroundSyncTask; + private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask; // don't convert to Setting<> and register... we only set this in tests and register via a plugin private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval"; @@ -198,7 +198,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust this.refreshTask = new AsyncRefreshTask(this); this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); - this.retentionLeaseBackgroundSyncTask = new AsyncRetentionLeaseBackgroundSyncTask(this); + this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); rescheduleFsyncTask(indexSettings.getTranslogDurability()); } @@ -289,7 +289,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust fsyncTask, trimTranslogTask, globalCheckpointTask, - retentionLeaseBackgroundSyncTask); + retentionLeaseSyncTask); } } } @@ -788,8 +788,8 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust sync(is -> is.maybeSyncGlobalCheckpoint("background"), "global checkpoint"); } - private void backgroundSyncRetentionLeases() { - sync(IndexShard::backgroundSyncRetentionLeases, "retention lease"); + private void syncRetentionLeases() { + sync(IndexShard::syncRetentionLeases, "retention lease"); } private void sync(final Consumer sync, final String source) { @@ -812,11 +812,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust && e instanceof IndexShardClosedException == false) { logger.warn( new ParameterizedMessage( - "{} failed to execute background {} sync", shard.shardId(), source), e); + "{} failed to execute {} sync", shard.shardId(), source), e); } }, ThreadPool.Names.SAME, - "background " + source + " sync"); + source + " sync"); } catch (final AlreadyClosedException | IndexShardClosedException e) { // the shard was closed concurrently, continue } @@ -957,15 +957,15 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } - final class AsyncRetentionLeaseBackgroundSyncTask extends BaseAsyncTask { + final class AsyncRetentionLeaseSyncTask extends BaseAsyncTask { - AsyncRetentionLeaseBackgroundSyncTask(final IndexService indexService) { + AsyncRetentionLeaseSyncTask(final IndexService indexService) { super(indexService, RETENTION_LEASE_SYNC_INTERVAL_SETTING.get(indexService.getIndexSettings().getSettings())); } @Override protected void runInternal() { - indexService.backgroundSyncRetentionLeases(); + indexService.syncRetentionLeases(); } @Override @@ -975,7 +975,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust @Override public String toString() { - return "retention_lease_background_sync"; + return "retention_lease_sync"; } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 4d9a8f7d37b..97b499f9bd3 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -339,6 +339,10 @@ public final class IndexSettings { return retentionLeaseMillis; } + private void setRetentionLeaseMillis(final TimeValue retentionLease) { + this.retentionLeaseMillis = retentionLease.millis(); + } + private volatile boolean warmerEnabled; private volatile int maxResultWindow; private volatile int maxInnerResultWindow; @@ -523,6 +527,7 @@ public final class IndexSettings { scopedSettings.addSettingsUpdateConsumer(DEFAULT_PIPELINE, this::setDefaultPipeline); scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING, this::setSoftDeleteRetentionOperations); scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_THROTTLED, this::setSearchThrottled); + scopedSettings.addSettingsUpdateConsumer(INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING, this::setRetentionLeaseMillis); } private void setSearchIdleAfter(TimeValue searchIdleAfter) { this.searchIdleAfter = searchIdleAfter; } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 3b68dfa6add..31f491d24cf 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -155,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final LongSupplier currentTimeMillisSupplier; /** - * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the - * retention lease sync action, to sync retention leases to replicas. + * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync + * retention leases to replicas. */ - private final BiConsumer> onSyncRetentionLeases; + private final BiConsumer> onAddRetentionLease; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -177,43 +178,42 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private RetentionLeases retentionLeases = RetentionLeases.EMPTY; /** - * Get all non-expired retention leases tracked on this shard. Note that only the primary shard calculates which leases are expired, - * and if any have expired, syncs the retention leases to any replicas. + * Get all retention leases tracked on this shard. * * @return the retention leases */ public RetentionLeases getRetentionLeases() { - final boolean wasPrimaryMode; - final RetentionLeases nonExpiredRetentionLeases; - synchronized (this) { - if (primaryMode) { - // the primary calculates the non-expired retention leases and syncs them to replicas - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Map> partitionByExpiration = retentionLeases - .leases() - .stream() - .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); - if (partitionByExpiration.get(true) == null) { - // early out as no retention leases have expired - return retentionLeases; - } - final Collection nonExpiredLeases = - partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); - retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); - } - /* - * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or - * we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the - * non-expired retention leases, instead receiving them on syncs from the primary. - */ - wasPrimaryMode = primaryMode; - nonExpiredRetentionLeases = retentionLeases; + return getRetentionLeases(false).v2(); + } + + /** + * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates + * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the + * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the + * expire leases parameter is true, this replication tracker must be in primary mode. + * + * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases + */ + public synchronized Tuple getRetentionLeases(final boolean expireLeases) { + if (expireLeases == false) { + return Tuple.tuple(false, retentionLeases); } - if (wasPrimaryMode) { - onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {})); + assert primaryMode; + // the primary calculates the non-expired retention leases and syncs them to replicas + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Map> partitionByExpiration = retentionLeases + .leases() + .stream() + .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); + if (partitionByExpiration.get(true) == null) { + // early out as no retention leases have expired + return Tuple.tuple(false, retentionLeases); } - return nonExpiredRetentionLeases; + final Collection nonExpiredLeases = + partitionByExpiration.get(false) != null ? partitionByExpiration.get(false) : Collections.emptyList(); + retentionLeases = new RetentionLeases(operationPrimaryTerm, retentionLeases.version() + 1, nonExpiredLeases); + return Tuple.tuple(true, retentionLeases); } /** @@ -246,7 +246,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList())); currentRetentionLeases = retentionLeases; } - onSyncRetentionLeases.accept(currentRetentionLeases, listener); + onAddRetentionLease.accept(currentRetentionLeases, listener); return retentionLease; } @@ -563,7 +563,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L * @param indexSettings the index settings * @param operationPrimaryTerm the current primary term * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires + * @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires */ public ReplicationTracker( final ShardId shardId, @@ -573,7 +573,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer> onSyncRetentionLeases) { + final BiConsumer> onAddRetentionLease) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -585,7 +585,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); - this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); + this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index c3d653e2fde..dfecdc17395 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1892,13 +1892,26 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Get all non-expired retention leases tracked on this shard. + * Get all retention leases tracked on this shard. * * @return the retention leases */ public RetentionLeases getRetentionLeases() { + return getRetentionLeases(false).v2(); + } + + /** + * If the expire leases parameter is false, gets all retention leases tracked on this shard and otherwise first calculates + * expiration of existing retention leases, and then gets all non-expired retention leases tracked on this shard. Note that only the + * primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. If the + * expire leases parameter is true, this replication tracker must be in primary mode. + * + * @return a tuple indicating whether or not any retention leases were expired, and the non-expired retention leases + */ + public Tuple getRetentionLeases(final boolean expireLeases) { + assert expireLeases == false || assertPrimaryMode(); verifyNotClosed(); - return replicationTracker.getRetentionLeases(); + return replicationTracker.getRetentionLeases(expireLeases); } public RetentionLeaseStats getRetentionLeaseStats() { @@ -1956,10 +1969,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Syncs the current retention leases to all replicas. */ - public void backgroundSyncRetentionLeases() { + public void syncRetentionLeases() { assert assertPrimaryMode(); verifyNotClosed(); - retentionLeaseSyncer.backgroundSync(shardId, getRetentionLeases()); + final Tuple retentionLeases = getRetentionLeases(true); + if (retentionLeases.v1()) { + retentionLeaseSyncer.sync(shardId, retentionLeases.v2(), ActionListener.wrap(() -> {})); + } else { + retentionLeaseSyncer.backgroundSync(shardId, retentionLeases.v2()); + } } /** diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 520344489ad..bb526a34708 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -37,7 +37,6 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongSupplier; import java.util.stream.Collectors; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; @@ -46,7 +45,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTestCase { @@ -78,7 +76,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); } for (int i = 0; i < length; i++) { @@ -88,7 +86,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes } minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, primaryTerm, 1 + length + i, true, false); } } @@ -193,7 +191,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 1, primaryMode, false); } // renew the lease @@ -215,108 +213,20 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primaryMode); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, primaryMode, false); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primaryMode) { - assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); + assertRetentionLeases(replicationTracker, 0, new long[0], primaryTerm, 3, true, true); } else { // leases do not expire on replicas until synced from the primary - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); } } - public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { - final AllocationId allocationId = AllocationId.newInitializing(); - final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); - final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); - final Settings settings = Settings - .builder() - .put( - IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), - TimeValue.timeValueMillis(retentionLeaseMillis)) - .build(); - final Map> retentionLeases = new HashMap<>(); - final AtomicBoolean invoked = new AtomicBoolean(); - final AtomicReference reference = new AtomicReference<>(); - final ReplicationTracker replicationTracker = new ReplicationTracker( - new ShardId("test", "_na", 0), - allocationId.getId(), - IndexSettingsModule.newIndexSettings("test", settings), - randomNonNegativeLong(), - UNASSIGNED_SEQ_NO, - value -> {}, - currentTimeMillis::get, - (leases, listener) -> { - // we do not want to hold a lock on the replication tracker in the callback! - assertFalse(Thread.holdsLock(reference.get())); - invoked.set(true); - assertThat( - leases.leases() - .stream() - .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), - equalTo(retentionLeases)); - }); - reference.set(replicationTracker); - replicationTracker.updateFromMaster( - randomNonNegativeLong(), - Collections.singleton(allocationId.getId()), - routingTable(Collections.emptySet(), allocationId), - Collections.emptySet()); - replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); - - final int length = randomIntBetween(0, 8); - long version = 0; - for (int i = 0; i < length; i++) { - final String id = randomAlphaOfLength(8); - final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); - replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); - version++; - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - // assert that the new retention lease callback was invoked - assertTrue(invoked.get()); - - // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease - invoked.set(false); - currentTimeMillis.set(1 + currentTimeMillis.get()); - retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); - replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); - version++; - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - - // reset the invocation marker so that we can assert the callback was invoked if any leases are expired - assertFalse(invoked.get()); - // randomly expire some leases - final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()); - // calculate the expired leases and update our tracking map - final List expiredIds = retentionLeases.entrySet() - .stream() - .filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - expiredIds.forEach(retentionLeases::remove); - if (expiredIds.isEmpty() == false) { - version++; - } - currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement); - // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback - final RetentionLeases current = replicationTracker.getRetentionLeases(); - assertThat(current.version(), equalTo(version)); - // the current leases should equal our tracking map - assertThat( - current.leases() - .stream() - .collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), - equalTo(retentionLeases)); - // the callback should only be invoked if there were expired leases - assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false)); - } - assertThat(replicationTracker.getRetentionLeases().version(), equalTo(version)); - } - public void testReplicaIgnoresOlderRetentionLeasesVersion() { final AllocationId allocationId = AllocationId.newInitializing(); final ReplicationTracker replicationTracker = new ReplicationTracker( @@ -370,19 +280,29 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes } } - private static Tuple toTuple(final RetentionLease retentionLease) { - return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp()); - } - private void assertRetentionLeases( final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, final long version, - final boolean primaryMode) { - final RetentionLeases retentionLeases = replicationTracker.getRetentionLeases(); + final boolean primaryMode, + final boolean expireLeases) { + assertTrue(expireLeases == false || primaryMode); + final RetentionLeases retentionLeases; + if (expireLeases == false) { + if (randomBoolean()) { + retentionLeases = replicationTracker.getRetentionLeases(); + } else { + final Tuple tuple = replicationTracker.getRetentionLeases(false); + assertFalse(tuple.v1()); + retentionLeases = tuple.v2(); + } + } else { + final Tuple tuple = replicationTracker.getRetentionLeases(true); + assertTrue(tuple.v1()); + retentionLeases = tuple.v2(); + } assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); @@ -395,12 +315,6 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - if (primaryMode) { - // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); - } assertThat(retentionLease.source(), equalTo("test-" + i)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index 3e69c84e3cd..2eb0b54f361 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -20,33 +20,58 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class RetentionLeaseSyncIT extends ESIntegTestCase { + public static final class RetentionLeaseSyncIntervalSettingPlugin extends Plugin { + + @Override + public List> getSettings() { + return Collections.singletonList(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING); + } + + } + + @Override + protected Collection> nodePlugins() { + return Stream.concat( + super.nodePlugins().stream(), + Stream.of(RetentionLeaseBackgroundSyncIT.RetentionLeaseSyncIntervalSettingPlugin.class)) + .collect(Collectors.toList()); + } + public void testRetentionLeasesSyncedOnAdd() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -99,7 +124,6 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { } } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/37963") public void testRetentionLeasesSyncOnExpiration() throws Exception { final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); @@ -109,7 +133,7 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { final Settings settings = Settings.builder() .put("index.number_of_shards", 1) .put("index.number_of_replicas", numberOfReplicas) - .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(1)) .build(); createIndex("index", settings); ensureGreen("index"); @@ -121,6 +145,17 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { // we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas final int length = randomIntBetween(1, 8); for (int i = 0; i < length; i++) { + // update the index for retention leases to live a long time + final AcknowledgedResponse longTtlResponse = client().admin() + .indices() + .prepareUpdateSettings("index") + .setSettings( + Settings.builder() + .putNull(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey()) + .build()) + .get(); + assertTrue(longTtlResponse.isAcknowledged()); + final String id = randomAlphaOfLength(8); final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE); final String source = randomAlphaOfLength(8); @@ -137,19 +172,26 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - assertThat(replica.getRetentionLeases().leases(), hasItem(currentRetentionLease)); + assertThat(replica.getRetentionLeases().leases(), anyOf(empty(), contains(currentRetentionLease))); } - // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have + // update the index for retention leases to short a long time, to force expiration + final AcknowledgedResponse shortTtlResponse = client().admin() + .indices() + .prepareUpdateSettings("index") + .setSettings( + Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .build()) + .get(); + assertTrue(shortTtlResponse.isAcknowledged()); + + // sleep long enough that the current retention lease has expired final long later = System.nanoTime(); Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); - final RetentionLeases currentRetentionLeases = primary.getRetentionLeases(); - assertThat(currentRetentionLeases.leases(), anyOf(empty(), contains(currentRetentionLease))); + assertBusy(() -> assertThat(primary.getRetentionLeases().leases(), empty())); - /* - * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in - * the background. - */ + // now that all retention leases are expired should have been synced to all replicas assertBusy(() -> { for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { final String replicaShardNodeId = replicaShard.currentNodeId(); @@ -157,13 +199,7 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { final IndexShard replica = internalCluster() .getInstance(IndicesService.class, replicaShardNodeName) .getShardOrNull(new ShardId(resolveIndex("index"), 0)); - if (currentRetentionLeases.leases().isEmpty()) { - assertThat(replica.getRetentionLeases().leases(), empty()); - } else { - assertThat( - replica.getRetentionLeases().leases(), - contains(currentRetentionLeases.leases().toArray(new RetentionLease[0]))); - } + assertThat(replica.getRetentionLeases().leases(), empty()); } }); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index 75d8d7e8e26..cc64fc6f8b2 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRoutingHelper; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -43,14 +44,12 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.LongSupplier; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; @@ -85,13 +84,20 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); assertRetentionLeases( - indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + i, true); + indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, primaryTerm, 1 + length + i, true); + assertRetentionLeases( + indexShard, + length, + minimumRetainingSequenceNumbers, + primaryTerm, + 1 + length + i, + true, + false); } } finally { closeShards(indexShard); @@ -121,8 +127,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(0, Long.MAX_VALUE); if (primary) { - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> { - })); + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); } else { final RetentionLeases retentionLeases = new RetentionLeases( primaryTerm, @@ -137,7 +142,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 1, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 1, primary, false); } // renew the lease @@ -159,16 +164,17 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { assertThat(retentionLeases.leases(), hasSize(1)); final RetentionLease retentionLease = retentionLeases.leases().iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, primary); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, primary, false); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); if (primary) { - assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 3, true); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, true, false); + assertRetentionLeases(indexShard, 0, new long[0], primaryTerm, 3, true, true); } else { - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryTerm, 2, false); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, primaryTerm, 2, false, false); } } finally { closeShards(indexShard); @@ -191,8 +197,7 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(randomNonNegativeLong())); indexShard.addRetentionLease( - Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> { - })); + Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); } currentTimeMillis.set(TimeUnit.NANOSECONDS.toMillis(Long.MAX_VALUE)); @@ -250,13 +255,10 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { final RetentionLeaseStats stats = indexShard.getRetentionLeaseStats(); assertRetentionLeases( stats.retentionLeases(), - indexShard.indexSettings().getRetentionLeaseMillis(), length, minimumRetainingSequenceNumbers, - () -> 0L, length == 0 ? RetentionLeases.EMPTY.primaryTerm() : indexShard.getOperationPrimaryTerm(), - length, - true); + length); } finally { closeShards(indexShard); } @@ -266,30 +268,39 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, final long version, - final boolean primary) { + final boolean primary, + final boolean expireLeases) { + assertTrue(expireLeases == false || primary); + final RetentionLeases retentionLeases; + if (expireLeases == false) { + if (randomBoolean()) { + retentionLeases = indexShard.getRetentionLeases(); + } else { + final Tuple tuple = indexShard.getRetentionLeases(false); + assertFalse(tuple.v1()); + retentionLeases = tuple.v2(); + } + } else { + final Tuple tuple = indexShard.getRetentionLeases(true); + assertTrue(tuple.v1()); + retentionLeases = tuple.v2(); + } assertRetentionLeases( - indexShard.getEngine().config().retentionLeasesSupplier().get(), - indexShard.indexSettings().getRetentionLeaseMillis(), + retentionLeases, size, minimumRetainingSequenceNumbers, - currentTimeMillisSupplier, primaryTerm, - version, - primary); + version); } private void assertRetentionLeases( final RetentionLeases retentionLeases, - final long retentionLeaseMillis, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier, final long primaryTerm, - final long version, - final boolean primary) { + final long version) { assertThat(retentionLeases.primaryTerm(), equalTo(primaryTerm)); assertThat(retentionLeases.version(), equalTo(version)); final Map idToRetentionLease = new HashMap<>(); @@ -302,10 +313,6 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - if (primary) { - // retention leases can be expired on replicas, so we can only assert on primaries here - assertThat(currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), lessThanOrEqualTo(retentionLeaseMillis)); - } assertThat(retentionLease.source(), equalTo("test-" + i)); } }