From 32b70ed34cbafb9979e7280a34938b1f675d8311 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 May 2019 13:45:58 -0400 Subject: [PATCH] Avoid unnecessary persistence of retention leases (#42299) Today we are persisting the retention leases at least every thirty seconds by a scheduled background sync. This sync causes an fsync to disk and when there are a large number of shards allocated to slow disks, these fsyncs can pile up and can severely impact the system. This commit addresses this by only persisting and fsyncing the retention leases if they have changed since the last time that we persisted and fsynced the retention leases. --- .../index/seqno/ReplicationTracker.java | 21 ++++++++- .../index/seqno/RetentionLeases.java | 20 +++++++-- ...ReplicationTrackerRetentionLeaseTests.java | 44 +++++++++++++++++++ .../index/seqno/RetentionLeasesTests.java | 4 ++ 4 files changed, 85 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index 745f6beb734..5c59007f9f2 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -180,6 +180,18 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ private RetentionLeases retentionLeases = RetentionLeases.EMPTY; + /** + * The primary term of the most-recently persisted retention leases. This is used to check if we need to persist the current retention + * leases. + */ + private long persistedRetentionLeasesPrimaryTerm; + + /** + * The version of the most-recently persisted retention leases. This is used to check if we need to persist the current retention + * leases. + */ + private long persistedRetentionLeasesVersion; + /** * Get all retention leases tracked on this shard. * @@ -342,7 +354,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L private final Object retentionLeasePersistenceLock = new Object(); /** - * Persists the current retention leases to their dedicated state file. + * Persists the current retention leases to their dedicated state file. If this version of the retention leases are already persisted + * then persistence is skipped. * * @param path the path to the directory containing the state file * @throws WriteStateException if an exception occurs writing the state file @@ -351,10 +364,16 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L synchronized (retentionLeasePersistenceLock) { final RetentionLeases currentRetentionLeases; synchronized (this) { + if (retentionLeases.supersedes(persistedRetentionLeasesPrimaryTerm, persistedRetentionLeasesVersion) == false) { + logger.trace("skipping persisting retention leases [{}], already persisted", retentionLeases); + return; + } currentRetentionLeases = retentionLeases; } logger.trace("persisting retention leases [{}]", currentRetentionLeases); RetentionLeases.FORMAT.writeAndCleanup(currentRetentionLeases, path); + persistedRetentionLeasesPrimaryTerm = currentRetentionLeases.primaryTerm(); + persistedRetentionLeasesVersion = currentRetentionLeases.version(); } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java index 7c3b9e3c7b9..1b94eec264e 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeases.java @@ -70,13 +70,27 @@ public class RetentionLeases implements ToXContentFragment, Writeable { /** * Checks if this retention leases collection supersedes the specified retention leases collection. A retention leases collection - * supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher + * supersedes another retention leases collection if its primary term is higher, or if for equal primary terms its version is higher. * * @param that the retention leases collection to test against * @return true if this retention leases collection supercedes the specified retention lease collection, otherwise false */ - public boolean supersedes(final RetentionLeases that) { - return primaryTerm > that.primaryTerm || primaryTerm == that.primaryTerm && version > that.version; + boolean supersedes(final RetentionLeases that) { + return supersedes(that.primaryTerm, that.version); + } + + /** + * Checks if this retention leases collection would supersede a retention leases collection with the specified primary term and version. + * A retention leases collection supersedes another retention leases collection if its primary term is higher, or if for equal primary + * terms its version is higher. + * + * @param primaryTerm the primary term + * @param version the version + * @return true if this retention leases collection would supercedes a retention lease collection with the specified primary term and + * version + */ + boolean supersedes(final long primaryTerm, final long version) { + return this.primaryTerm > primaryTerm || this.primaryTerm == primaryTerm && this.version > version; } private final Map leases; diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java index 178df2eac89..0e7cbaa42d1 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.gateway.WriteStateException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -499,6 +500,49 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes assertThat(replicationTracker.loadRetentionLeases(path), equalTo(replicationTracker.getRetentionLeases())); } + public void testUnnecessaryPersistenceOfRetentionLeases() throws IOException { + final AllocationId allocationId = AllocationId.newInitializing(); + long primaryTerm = randomLongBetween(1, Long.MAX_VALUE); + final ReplicationTracker replicationTracker = new ReplicationTracker( + new ShardId("test", "_na", 0), + allocationId.getId(), + IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), + primaryTerm, + UNASSIGNED_SEQ_NO, + value -> {}, + () -> 0L, + (leases, listener) -> {}); + replicationTracker.updateFromMaster( + randomNonNegativeLong(), + Collections.singleton(allocationId.getId()), + routingTable(Collections.emptySet(), allocationId), + Collections.emptySet()); + replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED); + final int length = randomIntBetween(0, 8); + for (int i = 0; i < length; i++) { + if (rarely() && primaryTerm < Long.MAX_VALUE) { + primaryTerm = randomLongBetween(primaryTerm + 1, Long.MAX_VALUE); + replicationTracker.setOperationPrimaryTerm(primaryTerm); + } + final long retainingSequenceNumber = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE); + replicationTracker.addRetentionLease( + Integer.toString(i), retainingSequenceNumber, "test-" + i, ActionListener.wrap(() -> {})); + } + + final Path path = createTempDir(); + replicationTracker.persistRetentionLeases(path); + + final Tuple retentionLeasesWithGeneration = + RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path); + + replicationTracker.persistRetentionLeases(path); + final Tuple retentionLeasesWithGenerationAfterUnnecessaryPersistence = + RetentionLeases.FORMAT.loadLatestStateWithGeneration(logger, NamedXContentRegistry.EMPTY, path); + + assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v1(), equalTo(retentionLeasesWithGeneration.v1())); + assertThat(retentionLeasesWithGenerationAfterUnnecessaryPersistence.v2(), equalTo(retentionLeasesWithGeneration.v2())); + } + /** * Test that we correctly synchronize writing the retention lease state file in {@link ReplicationTracker#persistRetentionLeases(Path)}. * This test can fail without the synchronization block in that method. diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java index 28444c7825e..c63b2ebb664 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeasesTests.java @@ -60,7 +60,9 @@ public class RetentionLeasesTests extends ESTestCase { final long higherPrimaryTerm = randomLongBetween(lowerPrimaryTerm + 1, Long.MAX_VALUE); final RetentionLeases right = new RetentionLeases(higherPrimaryTerm, randomLongBetween(1, Long.MAX_VALUE), Collections.emptyList()); assertTrue(right.supersedes(left)); + assertTrue(right.supersedes(left.primaryTerm(), left.version())); assertFalse(left.supersedes(right)); + assertFalse(left.supersedes(right.primaryTerm(), right.version())); } public void testSupersedesByVersion() { @@ -70,7 +72,9 @@ public class RetentionLeasesTests extends ESTestCase { final RetentionLeases left = new RetentionLeases(primaryTerm, lowerVersion, Collections.emptyList()); final RetentionLeases right = new RetentionLeases(primaryTerm, higherVersion, Collections.emptyList()); assertTrue(right.supersedes(left)); + assertTrue(right.supersedes(left.primaryTerm(), left.version())); assertFalse(left.supersedes(right)); + assertFalse(left.supersedes(right.primaryTerm(), right.version())); } public void testRetentionLeasesRejectsDuplicates() {