Obey translog durability in global checkpoint sync
After write operations in some situations we fire a post-operation global checkpoint sync. The global checkpoint sync unconditionally fsyncs the translog and this can then look like an fsync per-request. This violates the translog durability settings on the index if this durability is set to async. This commit changes the global checkpoint sync to observe the translog durability. Relates #27641
This commit is contained in:
parent
0b50b313d2
commit
17a2d574de
|
@ -39,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.index.shard.IndexShard;
|
import org.elasticsearch.index.shard.IndexShard;
|
||||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||||
import org.elasticsearch.index.shard.ShardId;
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
import org.elasticsearch.index.translog.Translog;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
@ -116,13 +117,17 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
|
||||||
@Override
|
@Override
|
||||||
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
|
protected PrimaryResult<Request, ReplicationResponse> shardOperationOnPrimary(
|
||||||
final Request request, final IndexShard indexShard) throws Exception {
|
final Request request, final IndexShard indexShard) throws Exception {
|
||||||
indexShard.getTranslog().sync();
|
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) {
|
||||||
|
indexShard.getTranslog().sync();
|
||||||
|
}
|
||||||
return new PrimaryResult<>(request, new ReplicationResponse());
|
return new PrimaryResult<>(request, new ReplicationResponse());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception {
|
protected ReplicaResult shardOperationOnReplica(final Request request, final IndexShard indexShard) throws Exception {
|
||||||
indexShard.getTranslog().sync();
|
if (indexShard.getTranslogDurability() == Translog.Durability.REQUEST) {
|
||||||
|
indexShard.getTranslog().sync();
|
||||||
|
}
|
||||||
return new ReplicaResult();
|
return new ReplicaResult();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
||||||
|
import static org.elasticsearch.mock.orig.Mockito.never;
|
||||||
import static org.elasticsearch.mock.orig.Mockito.when;
|
import static org.elasticsearch.mock.orig.Mockito.when;
|
||||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
@ -86,6 +87,9 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
|
||||||
final ShardId shardId = new ShardId(index, id);
|
final ShardId shardId = new ShardId(index, id);
|
||||||
when(indexShard.shardId()).thenReturn(shardId);
|
when(indexShard.shardId()).thenReturn(shardId);
|
||||||
|
|
||||||
|
final Translog.Durability durability = randomFrom(Translog.Durability.ASYNC, Translog.Durability.REQUEST);
|
||||||
|
when(indexShard.getTranslogDurability()).thenReturn(durability);
|
||||||
|
|
||||||
final Translog translog = mock(Translog.class);
|
final Translog translog = mock(Translog.class);
|
||||||
when(indexShard.getTranslog()).thenReturn(translog);
|
when(indexShard.getTranslog()).thenReturn(translog);
|
||||||
|
|
||||||
|
@ -105,7 +109,11 @@ public class GlobalCheckpointSyncActionTests extends ESTestCase {
|
||||||
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard);
|
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard);
|
||||||
}
|
}
|
||||||
|
|
||||||
verify(translog).sync();
|
if (durability == Translog.Durability.ASYNC) {
|
||||||
|
verify(translog, never()).sync();
|
||||||
|
} else {
|
||||||
|
verify(translog).sync();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue