diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 7e856022892..4a614d8874a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final LongSupplier currentTimeMillisSupplier; /** - * A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync - * retention leases to replicas. + * A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the + * retention lease sync action, to sync retention leases to replicas. */ - private final BiConsumer, ActionListener> onNewRetentionLease; + private final BiConsumer, ActionListener> onSyncRetentionLeases; /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the @@ -171,21 +171,45 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L } /** - * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. + * Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only + * the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas. * * @return the retention leases */ - public synchronized Collection getRetentionLeases() { - final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); - final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); - final Collection nonExpiredRetentionLeases = retentionLeases - .values() - .stream() - .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis) - .collect(Collectors.toList()); - retentionLeases.clear(); - retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease))); - return Collections.unmodifiableCollection(nonExpiredRetentionLeases); + public Collection getRetentionLeases() { + final boolean wasPrimaryMode; + final Collection nonExpiredRetentionLeases; + synchronized (this) { + if (primaryMode) { + // the primary calculates the non-expired retention leases and syncs them to replicas + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis(); + final Collection expiredRetentionLeases = retentionLeases + .values() + .stream() + .filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis) + .collect(Collectors.toList()); + if (expiredRetentionLeases.isEmpty()) { + // early out as no retention leases have expired + return copyRetentionLeases(); + } + // clean up the expired retention leases + for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) { + retentionLeases.remove(expiredRetentionLease.id()); + } + } + /* + * At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or + * we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the + * non-expired retention leases, instead receiving them on syncs from the primary. + */ + wasPrimaryMode = primaryMode; + nonExpiredRetentionLeases = copyRetentionLeases(); + } + if (wasPrimaryMode) { + onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {})); + } + return nonExpiredRetentionLeases; } /** @@ -215,7 +239,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L retentionLeases.put(id, retentionLease); currentRetentionLeases = copyRetentionLeases(); } - onNewRetentionLease.accept(currentRetentionLeases, listener); + onSyncRetentionLeases.accept(currentRetentionLeases, listener); return retentionLease; } @@ -500,11 +524,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L * Initialize the global checkpoint service. The specified global checkpoint should be set to the last known global checkpoint, or * {@link SequenceNumbers#UNASSIGNED_SEQ_NO}. * - * @param shardId the shard ID - * @param allocationId the allocation ID - * @param indexSettings the index settings - * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} - * @param onNewRetentionLease a callback when a new retention lease is created + * @param shardId the shard ID + * @param allocationId the allocation ID + * @param indexSettings the index settings + * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} + * @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires */ public ReplicationTracker( final ShardId shardId, @@ -513,7 +537,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final long globalCheckpoint, final LongConsumer onGlobalCheckpointUpdated, final LongSupplier currentTimeMillisSupplier, - final BiConsumer, ActionListener> onNewRetentionLease) { + final BiConsumer, ActionListener> onSyncRetentionLeases) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -524,7 +548,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier); - this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease); + this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 3dafb93d654..7a867027412 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.routing.AllocationId; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexSettings; @@ -30,6 +31,7 @@ import org.elasticsearch.test.IndexSettingsModule; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -67,17 +69,17 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); replicationTracker.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true); } } - public void testOnNewRetentionLease() { + public void testAddRetentionLeaseCausesRetentionLeaseSync() { final AllocationId allocationId = AllocationId.newInitializing(); final Map retentionLeases = new HashMap<>(); final AtomicBoolean invoked = new AtomicBoolean(); @@ -113,6 +115,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); // assert that the new retention lease callback was invoked assertTrue(invoked.get()); + // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease invoked.set(false); replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); @@ -120,7 +123,15 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes } } - public void testExpiration() { + public void testExpirationOnPrimary() { + runExpirationTest(true); + } + + public void testExpirationOnReplica() { + runExpirationTest(false); + } + + private void runExpirationTest(final boolean primaryMode) { final AllocationId allocationId = AllocationId.newInitializing(); final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); @@ -141,42 +152,136 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes Collections.singleton(allocationId.getId()), routingTable(Collections.emptySet(), allocationId), Collections.emptySet()); - replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + if (primaryMode) { + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + } final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + if (primaryMode) { + replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + } else { + replicationTracker.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = replicationTracker.getRetentionLeases(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); } // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + if (primaryMode) { + replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + } else { + replicationTracker.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = replicationTracker.getRetentionLeases(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode); } // now force the lease to expire currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); - assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get); + if (primaryMode) { + assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + } else { + // leases do not expire on replicas until synced from the primary + assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + } + } + + public void testRetentionLeaseExpirationCausesRetentionLeaseSync() { + final AllocationId allocationId = AllocationId.newInitializing(); + final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024)); + final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); + final Settings settings = Settings + .builder() + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) + .build(); + final Map> retentionLeases = new HashMap<>(); + final AtomicBoolean invoked = new AtomicBoolean(); + final AtomicReference reference = new AtomicReference<>(); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", settings), + UNASSIGNED_SEQ_NO, + value -> {}, + currentTimeMillis::get, + (leases, listener) -> { + // we do not want to hold a lock on the replication tracker in the callback! + assertFalse(Thread.holdsLock(reference.get())); + invoked.set(true); + assertThat( + leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + equalTo(retentionLeases)); + }); + reference.set(replicationTracker); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); + replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {})); + // assert that the new retention lease callback was invoked + assertTrue(invoked.get()); + + // reset the invocation marker so that we can assert the callback was not invoked when renewing the lease + invoked.set(false); + currentTimeMillis.set(1 + currentTimeMillis.get()); + retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get())); + replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test"); + + // reset the invocation marker so that we can assert the callback was invoked if any leases are expired + assertFalse(invoked.get()); + // randomly expire some leases + final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get()); + // calculate the expired leases and update our tracking map + final List expiredIds = retentionLeases.entrySet() + .stream() + .filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis) + .map(Map.Entry::getKey) + .collect(Collectors.toList()); + expiredIds.forEach(retentionLeases::remove); + currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement); + // getting the leases has the side effect of calculating which leases are expired and invoking the sync callback + final Collection current = replicationTracker.getRetentionLeases(); + // the current leases should equal our tracking map + assertThat( + current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)), + equalTo(retentionLeases)); + // the callback should only be invoked if there were expired leases + assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false)); + } + } + + private static Tuple toTuple(final RetentionLease retentionLease) { + return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp()); } private void assertRetentionLeases( final ReplicationTracker replicationTracker, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier) { + final LongSupplier currentTimeMillisSupplier, + final boolean primaryMode) { final Collection retentionLeases = replicationTracker.getRetentionLeases(); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { @@ -188,9 +293,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); + if (primaryMode) { + // retention leases can be expired on replicas, so we can only assert on primaries here + assertThat( + currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), + lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis())); + } assertThat(retentionLease.source(), equalTo("test-" + i)); } } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java index fad9e25db12..7d6e5fa2dc5 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncIT.java @@ -23,20 +23,28 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; public class RetentionLeaseSyncIT extends ESIntegTestCase { @@ -89,6 +97,73 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase { } } + public void testRetentionLeasesSyncOnExpiration() throws Exception { + final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2); + internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas); + final long estimatedTimeIntervalMillis = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis(); + final TimeValue retentionLeaseTimeToLive = + TimeValue.timeValueMillis(randomLongBetween(estimatedTimeIntervalMillis, 2 * estimatedTimeIntervalMillis)); + final Settings settings = Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", numberOfReplicas) + .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive) + .build(); + createIndex("index", settings); + ensureGreen("index"); + final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId(); + final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName(); + final IndexShard primary = internalCluster() + .getInstance(IndicesService.class, primaryShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + // we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas + final int length = randomIntBetween(1, 8); + for (int i = 0; i < length; i++) { + final String id = randomAlphaOfLength(8); + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + final String source = randomAlphaOfLength(8); + final CountDownLatch latch = new CountDownLatch(1); + final ActionListener listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())); + final RetentionLease currentRetentionLease = primary.addRetentionLease(id, retainingSequenceNumber, source, listener); + final long now = System.nanoTime(); + latch.await(); + + // check current retention leases have been synced to all replicas + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + assertThat(replica.getRetentionLeases(), hasItem(currentRetentionLease)); + } + + // sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have + final long later = System.nanoTime(); + Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now))); + final Collection currentRetentionLeases = primary.getRetentionLeases(); + assertThat(currentRetentionLeases, anyOf(empty(), contains(currentRetentionLease))); + + /* + * Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in + * the background. + */ + assertBusy(() -> { + for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) { + final String replicaShardNodeId = replicaShard.currentNodeId(); + final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName(); + final IndexShard replica = internalCluster() + .getInstance(IndicesService.class, replicaShardNodeName) + .getShardOrNull(new ShardId(resolveIndex("index"), 0)); + if (currentRetentionLeases.isEmpty()) { + assertThat(replica.getRetentionLeases(), empty()); + } else { + assertThat(replica.getRetentionLeases(), contains(currentRetentionLeases.toArray(new RetentionLease[0]))); + } + } + }); + } + } + private static Map toMap(final Collection replicaCommittedRetentionLeases) { return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity())); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java index eff1edfed52..cd7d2a2c12c 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardRetentionLeaseTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -81,57 +82,79 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); indexShard.addRetentionLease( Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {})); - assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true); } for (int i = 0; i < length; i++) { minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE); indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i); - assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L); + assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, true); } } finally { closeShards(indexShard); } } - public void testExpiration() throws IOException { + public void testExpirationOnPrimary() throws IOException { + runExpirationTest(true); + } + + public void testExpirationOnReplica() throws IOException { + runExpirationTest(false); + } + + private void runExpirationTest(final boolean primary) throws IOException { final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis()); final Settings settings = Settings .builder() .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis)) .build(); // current time is mocked through the thread pool - final IndexShard indexShard = newStartedShard(true, settings, new InternalEngineFactory()); + final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory()); try { final long[] retainingSequenceNumbers = new long[1]; retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); - indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + if (primary) { + indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {})); + } else { + indexShard.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); } // renew the lease currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024)); retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE); - indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + if (primary) { + indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0"); + } else { + indexShard.updateRetentionLeasesOnReplica( + Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0"))); + } { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); assertThat(retentionLeases, hasSize(1)); final RetentionLease retentionLease = retentionLeases.iterator().next(); assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get())); - assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get); + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary); } // now force the lease to expire currentTimeMillis.set( currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get())); - assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get); + if (primary) { + assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, true); + } else { + assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, false); + } } finally { closeShards(indexShard); } @@ -196,7 +219,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { final IndexShard indexShard, final int size, final long[] minimumRetainingSequenceNumbers, - final LongSupplier currentTimeMillisSupplier) { + final LongSupplier currentTimeMillisSupplier, + final boolean primary) { final Collection retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get(); final Map idToRetentionLease = new HashMap<>(); for (final RetentionLease retentionLease : retentionLeases) { @@ -208,9 +232,12 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase { assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i))); final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i)); assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i])); - assertThat( - currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), - lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + if (primary) { + // retention leases can be expired on replicas, so we can only assert on primaries here + assertThat( + currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(), + lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis())); + } assertThat(retentionLease.source(), equalTo("test-" + i)); } }