Soft-deletes policy should always fetch latest leases (#37940)

If a new retention lease is added while a primary's soft-deletes policy
is locked for peer-recovery, that lease won't be baked into the Lucene
commit.

Relates #37165
Relates #37375
This commit is contained in:
Nhat Nguyen 2019-01-31 12:02:57 -05:00 committed by GitHub
parent 68ed72b923
commit 8e95780f98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 5 deletions

View File

@ -46,7 +46,6 @@ final class SoftDeletesPolicy {
private long retentionOperations;
// The min seq_no value that is retained - ops after this seq# should exist in the Lucene index.
private long minRetainedSeqNo;
private Collection<RetentionLease> retentionLeases;
// provides the retention leases used to calculate the minimum sequence number to retain
private final Supplier<Collection<RetentionLease>> retentionLeasesSupplier;
@ -59,7 +58,6 @@ final class SoftDeletesPolicy {
this.retentionOperations = retentionOperations;
this.minRetainedSeqNo = minRetainedSeqNo;
this.retentionLeasesSupplier = Objects.requireNonNull(retentionLeasesSupplier);
retentionLeases = retentionLeasesSupplier.get();
this.localCheckpointOfSafeCommit = SequenceNumbers.NO_OPS_PERFORMED;
this.retentionLockCount = 0;
}
@ -113,6 +111,11 @@ final class SoftDeletesPolicy {
}
public synchronized Tuple<Long, Collection<RetentionLease>> getRetentionPolicy() {
/*
* When an engine is flushed, we need to provide it the latest collection of retention leases even when the soft deletes policy is
* locked for peer recovery.
*/
final Collection<RetentionLease> retentionLeases = retentionLeasesSupplier.get();
// do not advance if the retention lock is held
if (retentionLockCount == 0) {
/*
@ -126,7 +129,6 @@ final class SoftDeletesPolicy {
*/
// calculate the minimum sequence number to retain based on retention leases
retentionLeases = retentionLeasesSupplier.get();
final long minimumRetainingSequenceNumber = retentionLeases
.stream()
.mapToLong(RetentionLease::retainingSequenceNumber)

View File

@ -24,18 +24,21 @@ import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.seqno.RetentionLease;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@ -46,7 +49,7 @@ public class SoftDeletesPolicyTests extends ESTestCase {
*/
public void testSoftDeletesRetentionLock() {
long retainedOps = between(0, 10000);
AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final AtomicLong[] retainingSequenceNumbers = new AtomicLong[randomIntBetween(0, 8)];
for (int i = 0; i < retainingSequenceNumbers.length; i++) {
retainingSequenceNumbers[i] = new AtomicLong();
@ -116,4 +119,23 @@ public class SoftDeletesPolicyTests extends ESTestCase {
assertThat(policy.getMinRetainedSeqNo(), equalTo(minRetainedSeqNo));
}
public void testAlwaysFetchLatestRetentionLeases() {
final AtomicLong globalCheckpoint = new AtomicLong(NO_OPS_PERFORMED);
final Collection<RetentionLease> leases = new ArrayList<>();
final int numLeases = randomIntBetween(0, 10);
for (int i = 0; i < numLeases; i++) {
leases.add(new RetentionLease(Integer.toString(i), randomLongBetween(0, 1000), randomNonNegativeLong(), "test"));
}
final Supplier<Collection<RetentionLease>> leasesSupplier = () -> Collections.unmodifiableCollection(new ArrayList<>(leases));
final SoftDeletesPolicy policy =
new SoftDeletesPolicy(globalCheckpoint::get, randomIntBetween(1, 1000), randomIntBetween(0, 1000), leasesSupplier);
if (randomBoolean()) {
policy.acquireRetentionLock();
}
if (numLeases == 0) {
assertThat(policy.getRetentionPolicy().v2(), empty());
} else {
assertThat(policy.getRetentionPolicy().v2(), contains(leases.toArray(new RetentionLease[0])));
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -70,8 +71,11 @@ public class RetentionLeaseSyncIT extends ESIntegTestCase {
final String source = randomAlphaOfLength(8);
final CountDownLatch latch = new CountDownLatch(1);
final ActionListener<ReplicationResponse> listener = ActionListener.wrap(r -> latch.countDown(), e -> fail(e.toString()));
// simulate a peer-recovery which locks the soft-deletes policy on the primary.
final Closeable retentionLock = randomBoolean() ? primary.acquireRetentionLockForPeerRecovery() : () -> {};
currentRetentionLeases.put(id, primary.addRetentionLease(id, retainingSequenceNumber, source, listener));
latch.await();
retentionLock.close();
// check retention leases have been committed on the primary
final Collection<RetentionLease> primaryCommittedRetentionLeases = RetentionLease.decodeRetentionLeases(