From 963ed25cf599a78f12433ab9520e3172838405b5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 4 Dec 2017 18:10:04 -0500 Subject: [PATCH] Only fsync global checkpoint if needed In the global checkpoint sync action, we fsync the translog. However, the last synced global checkpoint might already be equal to the current global checkpoint in which case the fsyncing the translog is unnecessary as either the sync needed guard in the translog will skip the translog, or the translog needs an fsync for another reason that will be picked up elsewhere (e.g., at the end of a bulk request). Relates #27652 --- .../index/seqno/GlobalCheckpointSyncAction.java | 16 +++++++++++----- .../seqno/GlobalCheckpointSyncActionTests.java | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 04caf187db9..95e3505e746 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -44,6 +44,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Background global checkpoint sync action initiated when a shard goes inactive. This is needed because while we send the global checkpoint * on every replication operation, after the last operation completes the global checkpoint could advance but without a follow-up operation @@ -117,18 +119,22 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { - if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) { - indexShard.getTranslog().sync(); - } + maybeSyncTranslog(indexShard); return new PrimaryResult<>(request, new ReplicationResponse()); } @Override protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception { - if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) { + maybeSyncTranslog(indexShard); + return new ReplicaResult(); + } + + private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { + final Translog translog = indexShard.getTranslog(); + if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && + translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { indexShard.getTranslog().sync(); } - return new ReplicaResult(); } public static final class Request extends ReplicationRequest { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index c327e47c30c..71faecfcea5 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -93,6 +93,19 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase { final Translog translog = mock(Translog.class); when(indexShard.getTranslog()).thenReturn(translog); + final long globalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); + final long lastSyncedGlobalCheckpoint; + if (randomBoolean() && globalCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { + lastSyncedGlobalCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(globalCheckpoint) - 1); + assert lastSyncedGlobalCheckpoint < globalCheckpoint; + } else { + lastSyncedGlobalCheckpoint = globalCheckpoint; + } + + when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + when(translog.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, transportService, @@ -109,7 +122,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase { action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); } - if (durability == Translog.Durability.ASYNC) { + if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { verify(translog, never()).sync(); } else { verify(translog).sync();