diff --git a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 04caf187db9..95e3505e746 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -44,6 +44,8 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; + /** * Background global checkpoint sync action initiated when a shard goes inactive. This is needed because while we send the global checkpoint * on every replication operation, after the last operation completes the global checkpoint could advance but without a follow-up operation @@ -117,18 +119,22 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction< @Override protected PrimaryResult shardOperationOnPrimary( final Request request, final IndexShard indexShard) throws Exception { - if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) { - indexShard.getTranslog().sync(); - } + maybeSyncTranslog(indexShard); return new PrimaryResult<>(request, new ReplicationResponse()); } @Override protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception { - if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) { + maybeSyncTranslog(indexShard); + return new ReplicaResult(); + } + + private void maybeSyncTranslog(final IndexShard indexShard) throws IOException { + final Translog translog = indexShard.getTranslog(); + if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST && + translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) { indexShard.getTranslog().sync(); } - return new ReplicaResult(); } public static final class Request extends ReplicationRequest { diff --git a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index c327e47c30c..71faecfcea5 100644 --- a/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/core/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -93,6 +93,19 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase { final Translog translog = mock(Translog.class); when(indexShard.getTranslog()).thenReturn(translog); + final long globalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE); + final long lastSyncedGlobalCheckpoint; + if (randomBoolean() && globalCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) { + lastSyncedGlobalCheckpoint = + randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(globalCheckpoint) - 1); + assert lastSyncedGlobalCheckpoint < globalCheckpoint; + } else { + lastSyncedGlobalCheckpoint = globalCheckpoint; + } + + when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint); + when(translog.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint); + final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction( Settings.EMPTY, transportService, @@ -109,7 +122,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase { action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); } - if (durability == Translog.Durability.ASYNC) { + if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) { verify(translog, never()).sync(); } else { verify(translog).sync();