Enable removal of retention leases (#38751)

This commit introduces the ability to remove retention leases. Explicit
removal will be needed to manage retention leases used to increase the
likelihood of operation-based recoveries syncing, and for consumers such
as ILM.
This commit is contained in:
Jason Tedor 2019-02-11 21:17:23 -05:00
parent e2f432a413
commit b97c74bbab
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
5 changed files with 234 additions and 7 deletions

View File

@ -156,10 +156,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final LongSupplier currentTimeMillisSupplier;
/**
* A callback when a new retention lease is created. In practice, this callback invokes the retention lease sync action, to sync
* retention leases to replicas.
* A callback when a new retention lease is created or an existing retention lease is removed. In practice, this callback invokes the
* retention lease sync action, to sync retention leases to replicas.
*/
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease;
private final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases;
/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
@ -246,7 +246,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
Stream.concat(retentionLeases.leases().stream(), Stream.of(retentionLease)).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onAddRetentionLease.accept(currentRetentionLeases, listener);
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
return retentionLease;
}
@ -283,6 +283,29 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
return retentionLease;
}
/**
* Removes an existing retention lease.
*
* @param id the identifier of the retention lease
* @param listener the callback when the retention lease is successfully removed and synced to replicas
*/
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
final RetentionLeases currentRetentionLeases;
synchronized (this) {
assert primaryMode;
if (retentionLeases.contains(id) == false) {
throw new IllegalArgumentException("retention lease with ID [" + id + "] does not exist");
}
retentionLeases = new RetentionLeases(
operationPrimaryTerm,
retentionLeases.version() + 1,
retentionLeases.leases().stream().filter(lease -> lease.id().equals(id) == false).collect(Collectors.toList()));
currentRetentionLeases = retentionLeases;
}
onSyncRetentionLeases.accept(currentRetentionLeases, listener);
}
/**
* Updates retention leases on a replica.
*
@ -563,7 +586,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
* @param indexSettings the index settings
* @param operationPrimaryTerm the current primary term
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
* @param onAddRetentionLease a callback when a new retention lease is created or an existing retention lease expires
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
*/
public ReplicationTracker(
final ShardId shardId,
@ -573,7 +596,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated,
final LongSupplier currentTimeMillisSupplier,
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onAddRetentionLease) {
final BiConsumer<RetentionLeases, ActionListener<ReplicationResponse>> onSyncRetentionLeases) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@ -585,7 +608,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.currentTimeMillisSupplier = Objects.requireNonNull(currentTimeMillisSupplier);
this.onAddRetentionLease = Objects.requireNonNull(onAddRetentionLease);
this.onSyncRetentionLeases = Objects.requireNonNull(onSyncRetentionLeases);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;

View File

@ -1956,6 +1956,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return replicationTracker.renewRetentionLease(id, retainingSequenceNumber, source);
}
/**
* Removes an existing retention lease.
*
* @param id the identifier of the retention lease
* @param listener the callback when the retention lease is successfully removed and synced to replicas
*/
public void removeRetentionLease(final String id, final ActionListener<ReplicationResponse> listener) {
Objects.requireNonNull(listener);
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.removeRetentionLease(id, listener);
}
/**
* Updates retention leases on a replica.
*

View File

@ -137,6 +137,105 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
}
}
public void testRemoveRetentionLease() {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
}
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
/*
* Remove from the end since it will make the following assertion easier; we want to ensure that only the intended lease was
* removed.
*/
replicationTracker.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {}));
assertRetentionLeases(
replicationTracker,
length - i - 1,
minimumRetainingSequenceNumbers,
primaryTerm,
1 + length + i,
true,
false);
}
}
public void testRemoveRetentionLeaseCausesRetentionLeaseSync() {
final AllocationId allocationId = AllocationId.newInitializing();
final Map<String, Long> retainingSequenceNumbers = new HashMap<>();
final AtomicBoolean invoked = new AtomicBoolean();
final AtomicReference<ReplicationTracker> reference = new AtomicReference<>();
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
randomNonNegativeLong(),
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {
// we do not want to hold a lock on the replication tracker in the callback!
assertFalse(Thread.holdsLock(reference.get()));
invoked.set(true);
assertThat(
leases.leases()
.stream()
.collect(Collectors.toMap(RetentionLease::id, RetentionLease::retainingSequenceNumber)),
equalTo(retainingSequenceNumbers));
});
reference.set(replicationTracker);
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
retainingSequenceNumbers.put(id, retainingSequenceNumber);
replicationTracker.addRetentionLease(id, retainingSequenceNumber, "test", ActionListener.wrap(() -> {}));
// assert that the new retention lease callback was invoked
assertTrue(invoked.get());
// reset the invocation marker so that we can assert the callback was not invoked when removing the lease
invoked.set(false);
retainingSequenceNumbers.remove(id);
replicationTracker.removeRetentionLease(id, ActionListener.wrap(() -> {}));
assertTrue(invoked.get());
}
}
public void testExpirationOnPrimary() {
runExpirationTest(true);
}

View File

@ -126,6 +126,68 @@ public class RetentionLeaseIT extends ESIntegTestCase {
}
}
public void testRetentionLeaseSyncedOnRemove() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);
final Settings settings = Settings.builder()
.put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas)
.build();
createIndex("index", settings);
ensureGreen("index");
final String primaryShardNodeId = clusterService().state().routingTable().index("index").shard(0).primaryShard().currentNodeId();
final String primaryShardNodeName = clusterService().state().nodes().get(primaryShardNodeId).getName();
final IndexShard primary = internalCluster()
.getInstance(IndicesService.class, primaryShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final int length = randomIntBetween(1, 8);
final Map<String, RetentionLease> currentRetentionLeases = new HashMap<>();
for (int i = 0; i < length; i++) {
final String id = randomValueOtherThanMany(currentRetentionLeases.keySet()::contains, () -> randomAlphaOfLength(8));
final long retainingSequenceNumber = randomLongBetween(0, Long.MAX_VALUE);
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
}
for (int i = 0; i < length; i++) {
final String id = randomFrom(currentRetentionLeases.keySet());
final CountDownLatch latch = new CountDownLatch(1);
primary.removeRetentionLease(id, ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString())));
// simulate a peer recovery which locks the soft deletes policy on the primary
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.remove(id);
latch.await();
retentionLock.close();
// check retention leases have been committed on the primary
final RetentionLeases primaryCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
primary.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(primaryCommittedRetentionLeases)));
// check current retention leases have been synced to all replicas
for (final ShardRouting replicaShard : clusterService().state().routingTable().index("index").shard(0).replicaShards()) {
final String replicaShardNodeId = replicaShard.currentNodeId();
final String replicaShardNodeName = clusterService().state().nodes().get(replicaShardNodeId).getName();
final IndexShard replica = internalCluster()
.getInstance(IndicesService.class, replicaShardNodeName)
.getShardOrNull(new ShardId(resolveIndex("index"), 0));
final Map<String, RetentionLease> retentionLeasesOnReplica = RetentionLeases.toMap(replica.getRetentionLeases());
assertThat(retentionLeasesOnReplica, equalTo(currentRetentionLeases));
// check retention leases have been committed on the replica
final RetentionLeases replicaCommittedRetentionLeases = RetentionLeases.decodeRetentionLeases(
replica.commitStats().getUserData().get(Engine.RETENTION_LEASES));
assertThat(currentRetentionLeases, equalTo(RetentionLeases.toMap(replicaCommittedRetentionLeases)));
}
}
}
public void testRetentionLeasesSyncOnExpiration() throws Exception {
final int numberOfReplicas = 2 - scaledRandomIntBetween(0, 2);
internalCluster().ensureAtLeastNumDataNodes(1 + numberOfReplicas);

View File

@ -104,6 +104,36 @@ public class IndexShardRetentionLeaseTests extends IndexShardTestCase {
}
}
public void testRemoveRetentionLease() throws IOException {
final IndexShard indexShard = newStartedShard(true);
final long primaryTerm = indexShard.getOperationPrimaryTerm();
try {
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
for (int i = 0; i < length; i++) {
minimumRetainingSequenceNumbers[i] = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
indexShard.addRetentionLease(
Integer.toString(i), minimumRetainingSequenceNumbers[i], "test-" + i, ActionListener.wrap(() -> {}));
assertRetentionLeases(
indexShard, i + 1, minimumRetainingSequenceNumbers, primaryTerm, 1 + i, true, false);
}
for (int i = 0; i < length; i++) {
indexShard.removeRetentionLease(Integer.toString(length - i - 1), ActionListener.wrap(() -> {}));
assertRetentionLeases(
indexShard,
length - i - 1,
minimumRetainingSequenceNumbers,
primaryTerm,
1 + length + i,
true,
false);
}
} finally {
closeShards(indexShard);
}
}
public void testExpirationOnPrimary() throws IOException {
runExpirationTest(true);
}