Avoid unnecessary persistence of retention leases (#42299)

Today we are persisting the retention leases at least every thirty
seconds by a scheduled background sync. This sync causes an fsync to
disk and when there are a large number of shards allocated to slow
disks, these fsyncs can pile up and can severely impact the system. This
commit addresses this by only persisting and fsyncing the retention
leases if they have changed since the last time that we persisted and
fsynced the retention leases.
This commit is contained in:
Jason Tedor 2019-05-21 13:45:58 -04:00
parent 7e4d3c695b
commit 32b70ed34c
No known key found for this signature in database
GPG Key ID: FA89F05560F16BC5
4 changed files with 85 additions and 4 deletions

View File

@ -180,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/ */
private RetentionLeases retentionLeases = RetentionLeases.EMPTY; private RetentionLeases retentionLeases = RetentionLeases.EMPTY;
/**
* The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesPrimaryTerm;
/**
* The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention
* leases.
*/
private long persistedRetentionLeasesVersion;
/** /**
* Get all retention leases tracked on this shard. * Get all retention leases tracked on this shard.
* *
@ -342,7 +354,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
private final Object retentionLeasePersistenceLock = new Object(); private final Object retentionLeasePersistenceLock = new Object();
/** /**
* Persists the current retention leases to their dedicated state file. * Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted
* then persistence is skipped.
* *
* @param path the path to the directory containing the state file * @param path the path to the directory containing the state file
* @throws WriteStateException if an exception occurs writing the state file * @throws WriteStateException if an exception occurs writing the state file
@ -351,10 +364,16 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
synchronized (retentionLeasePersistenceLock) { synchronized (retentionLeasePersistenceLock) {
final RetentionLeases currentRetentionLeases; final RetentionLeases currentRetentionLeases;
synchronized (this) { synchronized (this) {
if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) {
logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases);
return;
}
currentRetentionLeases = retentionLeases; currentRetentionLeases = retentionLeases;
} }
logger.trace("persisting retention leases [{}]", currentRetentionLeases); logger.trace("persisting retention leases [{}]", currentRetentionLeases);
RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path); RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path);
persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm();
persistedRetentionLeasesVersion = currentRetentionLeases.version();
} }
} }

View File

@ -70,13 +70,27 @@ public class RetentionLeases implements ToXContentFragment, Writeable {
/** /**
* Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection * Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection
* supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher * supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher.
* *
* @param that the retention leases collection to test against * @param that the retention leases collection to test against
* @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false * @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false
*/ */
public boolean supersedes(final RetentionLeases that) { boolean supersedes(final RetentionLeases that) {
return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version; return supersedes(that.primaryTerm, that.version);
}
/**
* Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version.
* A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary
* terms its version is higher.
*
* @param primaryTerm the primary term
* @param version the version
* @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and
* version
*/
boolean supersedes(final long primaryTerm, final long version) {
return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version;
} }
private final Map<String, RetentionLease> leases; private final Map<String, RetentionLease> leases;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -499,6 +500,49 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases())); assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases()));
} }
public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException {
final AllocationId allocationId = AllocationId.newInitializing();
long primaryTerm = randomLongBetween(1, Long.MAX_VALUE);
final ReplicationTracker replicationTracker = new ReplicationTracker(
new ShardId("test", "_na", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
primaryTerm,
UNASSIGNED_SEQ_NO,
value -> {},
() -> 0L,
(leases, listener) -> {});
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
if (rarely() && primaryTerm < Long.MAX_VALUE) {
primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE);
replicationTracker.setOperationPrimaryTerm(primaryTerm);
}
final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
replicationTracker.addRetentionLease(
Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {}));
}
final Path path = createTempDir();
replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGeneration =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
replicationTracker.persistRetentionLeases(path);
final Tuple<RetentionLeases, Long> retentionLeasesWithGenerationAfterUnnecessaryPersistence =
RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path);
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1()));
assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2()));
}
/** /**
* Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}. * Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}.
* This test can fail without the synchronization block in that method. * This test can fail without the synchronization block in that method.

View File

@ -60,7 +60,9 @@ public class RetentionLeasesTests extends ESTestCase {
final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE); final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE);
final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList()); final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList());
assertTrue(right.supersedes(left)); assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right)); assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
} }
public void testSupersedesByVersion() { public void testSupersedesByVersion() {
@ -70,7 +72,9 @@ public class RetentionLeasesTests extends ESTestCase {
final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList()); final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList());
final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList()); final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList());
assertTrue(right.supersedes(left)); assertTrue(right.supersedes(left));
assertTrue(right.supersedes(left.primaryTerm(), left.version()));
assertFalse(left.supersedes(right)); assertFalse(left.supersedes(right));
assertFalse(left.supersedes(right.primaryTerm(), right.version()));
} }
public void testRetentionLeasesRejectsDuplicates() { public void testRetentionLeasesRejectsDuplicates() {