Only fsync global checkpoint if needed
In the global checkpoint sync action, we fsync the translog. However, the last synced global checkpoint might already be equal to the current global checkpoint in which case the fsyncing the translog is unnecessary as either the sync needed guard in the translog will skip the translog, or the translog needs an fsync for another reason that will be picked up elsewhere (e.g., at the end of a bulk request). Relates #27652
This commit is contained in:
parent
d3721c48c3
commit
963ed25cf5
|
@ -44,6 +44,8 @@ import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Background global checkpoint sync action initiated when a shard goes inactive. This is needed because while we send the global checkpoint
|
* Background global checkpoint sync action initiated when a shard goes inactive. This is needed because while we send the global checkpoint
|
||||||
* on every replication operation, after the last operation completes the global checkpoint could advance but without a follow-up operation
|
* on every replication operation, after the last operation completes the global checkpoint could advance but without a follow-up operation
|
||||||
|
@ -117,18 +119,22 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
|
||||||
@Override
|
@Override
|
||||||
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
|
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
|
||||||
final Request request, final IndexShard indexShard) throws Exception {
|
final Request request, final IndexShard indexShard) throws Exception {
|
||||||
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) {
|
maybeSyncTranslog(indexShard);
|
||||||
indexShard.getTranslog().sync();
|
|
||||||
}
|
|
||||||
return new PrimaryResult<>(request, new ReplicationResponse());
|
return new PrimaryResult<>(request, new ReplicationResponse());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception {
|
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception {
|
||||||
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) {
|
maybeSyncTranslog(indexShard);
|
||||||
|
return new ReplicaResult();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeSyncTranslog(final IndexShard indexShard) throws IOException {
|
||||||
|
final Translog translog = indexShard.getTranslog();
|
||||||
|
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST &&
|
||||||
|
translog.getLastSyncedGlobalCheckpoint() < indexShard.getGlobalCheckpoint()) {
|
||||||
indexShard.getTranslog().sync();
|
indexShard.getTranslog().sync();
|
||||||
}
|
}
|
||||||
return new ReplicaResult();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static final class Request extends ReplicationRequest<Request> {
|
public static final class Request extends ReplicationRequest<Request> {
|
||||||
|
|
|
@ -93,6 +93,19 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
|
||||||
final Translog translog = mock(Translog.class);
|
final Translog translog = mock(Translog.class);
|
||||||
when(indexShard.getTranslog()).thenReturn(translog);
|
when(indexShard.getTranslog()).thenReturn(translog);
|
||||||
|
|
||||||
|
final long globalCheckpoint = randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Integer.MAX_VALUE);
|
||||||
|
final long lastSyncedGlobalCheckpoint;
|
||||||
|
if (randomBoolean() && globalCheckpoint != SequenceNumbers.NO_OPS_PERFORMED) {
|
||||||
|
lastSyncedGlobalCheckpoint =
|
||||||
|
randomIntBetween(Math.toIntExact(SequenceNumbers.NO_OPS_PERFORMED), Math.toIntExact(globalCheckpoint) - 1);
|
||||||
|
assert lastSyncedGlobalCheckpoint < globalCheckpoint;
|
||||||
|
} else {
|
||||||
|
lastSyncedGlobalCheckpoint = globalCheckpoint;
|
||||||
|
}
|
||||||
|
|
||||||
|
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
|
||||||
|
when(translog.getLastSyncedGlobalCheckpoint()).thenReturn(lastSyncedGlobalCheckpoint);
|
||||||
|
|
||||||
final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
|
final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
|
||||||
Settings.EMPTY,
|
Settings.EMPTY,
|
||||||
transportService,
|
transportService,
|
||||||
|
@ -109,7 +122,7 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
|
||||||
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard);
|
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (durability == Translog.Durability.ASYNC) {
|
if (durability == Translog.Durability.ASYNC || lastSyncedGlobalCheckpoint == globalCheckpoint) {
|
||||||
verify(translog, never()).sync();
|
verify(translog, never()).sync();
|
||||||
} else {
|
} else {
|
||||||
verify(translog).sync();
|
verify(translog).sync();
|
||||||
|
|
Loading…
Reference in New Issue