Sync retention leases on expiration (#37902)

This commit introduces a sync of retention leases when a retention lease
expires. As expiration of retention leases is lazy, their expiration is
managed only when getting the current retention leases from the
replication tracker. At this point, we callback to our full retention
lease sync to sync and flush these on all shard copies. With this
change, replicas do not locally manage expiration of retention leases;
instead, that is done only on the primary.
This commit is contained in:
Jason Tedor 2019-01-28 07:11:51 -05:00 committed by GitHub
parent 758eb9d451
commit 194cdfe208
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 284 additions and 50 deletions

View File

@ -147,10 +147,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final LongSupplier currentTimeMillisSupplier;
/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
* A callback when a new retention lease is created or an existing retention lease expires. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease;
private final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@ -171,21 +171,45 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
}
/**
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned.
* Get all non-expired retention leases tracked on this shard. An unmodifiable copy of the retention leases is returned. Note that only
* the primary shard calculates which leases are expired, and if any have expired, syncs the retention leases to any replicas.
*
* @return the retention leases
*/
public synchronized Collection<RetentionLease> getRetentionLeases() {
public Collection<RetentionLease> getRetentionLeases() {
final boolean wasPrimaryMode;
final Collection<RetentionLease> nonExpiredRetentionLeases;
synchronized (this) {
if (primaryMode) {
// the primary calculates the non-expired retention leases and syncs them to replicas
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
final long retentionLeaseMillis = indexSettings.getRetentionLeaseMillis();
final Collection<RetentionLease> nonExpiredRetentionLeases = retentionLeases
final Collection<RetentionLease> expiredRetentionLeases = retentionLeases
.values()
.stream()
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() <= retentionLeaseMillis)
.filter(retentionLease -> currentTimeMillis - retentionLease.timestamp() > retentionLeaseMillis)
.collect(Collectors.toList());
retentionLeases.clear();
retentionLeases.putAll(nonExpiredRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, lease -> lease)));
return Collections.unmodifiableCollection(nonExpiredRetentionLeases);
if (expiredRetentionLeases.isEmpty()) {
// early out as no retention leases have expired
return copyRetentionLeases();
}
// clean up the expired retention leases
for (final RetentionLease expiredRetentionLease : expiredRetentionLeases) {
retentionLeases.remove(expiredRetentionLease.id());
}
}
/*
* At this point, we were either in primary mode and have updated the non-expired retention leases into the tracking map, or
* we were in replica mode and merely need to copy the existing retention leases since a replica does not calculate the
* non-expired retention leases, instead receiving them on syncs from the primary.
*/
wasPrimaryMode = primaryMode;
nonExpiredRetentionLeases = copyRetentionLeases();
}
if (wasPrimaryMode) {
onSyncRetentionLeases.accept(nonExpiredRetentionLeases, ActionListener.wrap(() -> {}));
}
return nonExpiredRetentionLeases;
}
/**
@ -215,7 +239,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
retentionLeases.put(id, retentionLease);
currentRetentionLeases = copyRetentionLeases();
}
onNewRetentionLease.accept(currentRetentionLeases, listener);
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}
@ -504,7 +528,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* @param allocationId the allocation ID
* @param indexSettings the index settings
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onNewRetentionLease a callback when a new retention lease is created
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
@ -513,7 +537,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease) {
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@ -524,7 +548,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onNewRetentionLease = Objects.requireNonNull(onNewRetentionLease);
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
@ -30,6 +31,7 @@ import org.elasticsearch.test.IndexSettingsModule;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -67,17 +69,17 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
}
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
replicationTracker.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(replicationTracker, length, minimumRetainingSequenceNumbers, () -> 0L, true);
}
}
public void testOnNewRetentionLease() {
public void testAddRetentionLeaseCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final Map<String, Long> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
@ -113,6 +115,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());
// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
@ -120,7 +123,15 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
}
}
public void testExpiration() {
public void testExpirationOnPrimary() {
runExpirationTest(true);
}
public void testExpirationOnReplica() {
runExpirationTest(false);
}
private void runExpirationTest(final boolean primaryMode) {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
@ -141,42 +152,136 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
if (primaryMode) {
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
}
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
if (primaryMode) {
replicationTracker.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}
{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}
// renew the lease
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
if (primaryMode) {
replicationTracker.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
replicationTracker.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}
{
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, primaryMode);
}
// now force the lease to expire
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get);
if (primaryMode) {
assertRetentionLeases(replicationTracker, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
} else {
// leases do not expire on replicas until synced from the primary
assertRetentionLeases(replicationTracker, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
}
}
public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final AtomicLong currentTimeMillis = new AtomicLong(randomLongBetween(0, 1024));
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
final Map<String, Tuple<Long, Long>> retentionLeases = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", settings),
UNASSIGNED_SEQ_NO,
value -> {},
currentTimeMillis::get,
(leases, listener) -> {
// we do not want to hold a lock on the replication tracker in the callback!
assertFalse(Thread.holdsLock(reference.get()));
invoked.set(true);
assertThat(
leases.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
});
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());
// reset the invocation marker so that we can assert the callback was not invoked when renewing the lease
invoked.set(false);
currentTimeMillis.set(1 + currentTimeMillis.get());
retentionLeases.put(id, Tuple.tuple(retainingSequenceNumber, currentTimeMillis.get()));
replicationTracker.renewRetentionLease(id, retainingSequenceNumber, "test");
// reset the invocation marker so that we can assert the callback was invoked if any leases are expired
assertFalse(invoked.get());
// randomly expire some leases
final long currentTimeMillisIncrement = randomLongBetween(0, Long.MAX_VALUE - currentTimeMillis.get());
// calculate the expired leases and update our tracking map
final List<String> expiredIds = retentionLeases.entrySet()
.stream()
.filter(r -> currentTimeMillis.get() + currentTimeMillisIncrement > r.getValue().v2() + retentionLeaseMillis)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
expiredIds.forEach(retentionLeases::remove);
currentTimeMillis.set(currentTimeMillis.get() + currentTimeMillisIncrement);
// getting the leases has the side effect of calculating which leases are expired and invoking the sync callback
final Collection<RetentionLease> current = replicationTracker.getRetentionLeases();
// the current leases should equal our tracking map
assertThat(
current.stream().collect(Collectors.toMap(RetentionLease::id, ReplicationTrackerRetentionLeaseTests::toTuple)),
equalTo(retentionLeases));
// the callback should only be invoked if there were expired leases
assertThat(invoked.get(), equalTo(expiredIds.isEmpty() == false));
}
}
private static Tuple<Long, Long> toTuple(final RetentionLease retentionLease) {
return Tuple.tuple(retentionLease.retainingSequenceNumber(), retentionLease.timestamp());
}
private void assertRetentionLeases(
final ReplicationTracker replicationTracker,
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final boolean primaryMode) {
final Collection<RetentionLease> retentionLeases = replicationTracker.getRetentionLeases();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
@ -188,9 +293,12 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
if (primaryMode) {
// retention leases can be expired on replicas, so we can only assert on primaries here
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(replicationTracker.indexSettings().getRetentionLeaseMillis()));
}
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}

View File

@ -23,20 +23,28 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
public class RetentionLeaseSyncIT extends ESIntegTestCase {
@ -89,6 +97,73 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase {
}
}
public void testRetentionLeasesSyncOnExpiration() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
final long estimatedTimeIntervalMillis = ThreadPool.ESTIMATED_TIME_INTERVAL_SETTING.get(Settings.EMPTY).millis();
final TimeValue retentionLeaseTimeToLive =
TimeValue.timeValueMillis(randomLongBetween(estimatedTimeIntervalMillis, 2 * estimatedTimeIntervalMillis));
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), retentionLeaseTimeToLive)
.build();
createIndex("index", settings);
ensureGreen("index");
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
// we will add multiple retention leases, wait for some to expire, and assert a consistent view between the primary and the replicas
final int length = randomIntBetween(1, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
final RetentionLease currentRetentionLease = primary.addRetentionLease(id, retainingSequenceNumber, source, listener);
final long now = System.nanoTime();
latch.await();
// check current retention leases have been synced to all replicas
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
assertThat(replica.getRetentionLeases(), hasItem(currentRetentionLease));
}
// sleep long enough that *possibly* the current retention lease has expired, and certainly that any previous have
final long later = System.nanoTime();
Thread.sleep(Math.max(0, retentionLeaseTimeToLive.millis() - TimeUnit.NANOSECONDS.toMillis(later - now)));
final Collection<RetentionLease> currentRetentionLeases = primary.getRetentionLeases();
assertThat(currentRetentionLeases, anyOf(empty(), contains(currentRetentionLease)));
/*
* Check that expiration of retention leases has been synced to all replicas. We have to assert busy since syncing happens in
* the background.
*/
assertBusy(() -> {
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
if (currentRetentionLeases.isEmpty()) {
assertThat(replica.getRetentionLeases(), empty());
} else {
assertThat(replica.getRetentionLeases(), contains(currentRetentionLeases.toArray(new RetentionLease[0])));
}
}
});
}
}
private static Map<String, RetentionLease> toMap(final Collection<RetentionLease> replicaCommittedRetentionLeases) {
return replicaCommittedRetentionLeases.stream().collect(Collectors.toMap(RetentionLease::id, Function.identity()));
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -81,57 +82,79 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
indexShard.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(indexShard, i + 1, minimumRetainingSequenceNumbers, () -> 0L, true);
}
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(minimumRetainingSequenceNumbers[i], Long.MAX_VALUE);
indexShard.renewRetentionLease(Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i);
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L);
assertRetentionLeases(indexShard, length, minimumRetainingSequenceNumbers, () -> 0L, true);
}
} finally {
closeShards(indexShard);
}
}
public void testExpiration() throws IOException {
public void testExpirationOnPrimary() throws IOException {
runExpirationTest(true);
}
public void testExpirationOnReplica() throws IOException {
runExpirationTest(false);
}
private void runExpirationTest(final boolean primary) throws IOException {
final long retentionLeaseMillis = randomLongBetween(1, TimeValue.timeValueHours(12).millis());
final Settings settings = Settings
.builder()
.put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_LEASE_SETTING.getKey(), TimeValue.timeValueMillis(retentionLeaseMillis))
.build();
// current time is mocked through the thread pool
final IndexShard indexShard = newStartedShard(true, settings, new InternalEngineFactory());
final IndexShard indexShard = newStartedShard(primary, settings, new InternalEngineFactory());
try {
final long[] retainingSequenceNumbers = new long[1];
retainingSequenceNumbers[0] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
if (primary) {
indexShard.addRetentionLease("0", retainingSequenceNumbers[0], "test-0", ActionListener.wrap(() -> {}));
} else {
indexShard.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}
{
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary);
}
// renew the lease
currentTimeMillis.set(currentTimeMillis.get() + randomLongBetween(0, 1024));
retainingSequenceNumbers[0] = randomLongBetween(retainingSequenceNumbers[0], Long.MAX_VALUE);
if (primary) {
indexShard.renewRetentionLease("0", retainingSequenceNumbers[0], "test-0");
} else {
indexShard.updateRetentionLeasesOnReplica(
Collections.singleton(new RetentionLease("0", retainingSequenceNumbers[0], currentTimeMillis.get(), "test-0")));
}
{
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
assertThat(retentionLeases, hasSize(1));
final RetentionLease retentionLease = retentionLeases.iterator().next();
assertThat(retentionLease.timestamp(), equalTo(currentTimeMillis.get()));
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get);
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, primary);
}
// now force the lease to expire
currentTimeMillis.set(
currentTimeMillis.get() + randomLongBetween(retentionLeaseMillis, Long.MAX_VALUE - currentTimeMillis.get()));
assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get);
if (primary) {
assertRetentionLeases(indexShard, 0, retainingSequenceNumbers, currentTimeMillis::get, true);
} else {
assertRetentionLeases(indexShard, 1, retainingSequenceNumbers, currentTimeMillis::get, false);
}
} finally {
closeShards(indexShard);
}
@ -196,7 +219,8 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
final IndexShard indexShard,
final int size,
final long[] minimumRetainingSequenceNumbers,
final LongSupplier currentTimeMillisSupplier) {
final LongSupplier currentTimeMillisSupplier,
final boolean primary) {
final Collection<RetentionLease> retentionLeases = indexShard.getEngine().config().retentionLeasesSupplier().get();
final Map<String, RetentionLease> idToRetentionLease = new HashMap<>();
for (final RetentionLease retentionLease : retentionLeases) {
@ -208,9 +232,12 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
assertThat(idToRetentionLease.keySet(), hasItem(Integer.toString(i)));
final RetentionLease retentionLease = idToRetentionLease.get(Integer.toString(i));
assertThat(retentionLease.retainingSequenceNumber(), equalTo(minimumRetainingSequenceNumbers[i]));
if (primary) {
// retention leases can be expired on replicas, so we can only assert on primaries here
assertThat(
currentTimeMillisSupplier.getAsLong() - retentionLease.timestamp(),
lessThanOrEqualTo(indexShard.indexSettings().getRetentionLeaseMillis()));
}
assertThat(retentionLease.source(), equalTo("test-" + i));
}
}