CCR: Do not minimization requesting range on leader (#30980)
Today before reading operations on the leading shard, we minimization the requesting range with the global checkpoint. However, this might make the request invalid if the following shard generates a requesting range based on the global-checkpoint from a primary shard and sends that request to a replica whose global checkpoint is lagged. Another issue is that we are mutating the request when applying minimization. If the request becomes invalid on a replica, we will reroute the mutated request instead of the original one to the primary. This commit removes the minimization and replaces it by a range check with the local checkpoint.
This commit is contained in:
parent
7e8cf768cf
commit
fa54be2dcd
|
@ -235,9 +235,14 @@ public class ShardChangesAction extends Action<ShardChangesAction.Request, Shard
|
||||||
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
|
protected Response shardOperation(Request request, ShardId shardId) throws IOException {
|
||||||
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
|
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex());
|
||||||
IndexShard indexShard = indexService.getShard(request.getShard().id());
|
IndexShard indexShard = indexService.getShard(request.getShard().id());
|
||||||
|
|
||||||
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
|
final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion();
|
||||||
request.maxSeqNo = Math.min(request.maxSeqNo, indexShard.getGlobalCheckpoint());
|
// The following shard generates the request based on the global checkpoint which may not be synced to all leading copies.
|
||||||
|
// However, this guarantees that the requesting range always be below the local-checkpoint of any leading copies.
|
||||||
|
final long localCheckpoint = indexShard.getLocalCheckpoint();
|
||||||
|
if (localCheckpoint < request.minSeqNo || localCheckpoint < request.maxSeqNo) {
|
||||||
|
throw new IllegalStateException("invalid request from_seqno=[" + request.minSeqNo + "], " +
|
||||||
|
"to_seqno=[" + request.maxSeqNo + "], local_checkpoint=[" + localCheckpoint + "], shardId=[" + shardId + "]");
|
||||||
|
}
|
||||||
final Translog.Operation[] operations =
|
final Translog.Operation[] operations =
|
||||||
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
|
getOperationsBetween(indexShard, request.minSeqNo, request.maxSeqNo, request.maxTranslogsBytes);
|
||||||
return new Response(indexMetaDataVersion, operations);
|
return new Response(indexMetaDataVersion, operations);
|
||||||
|
|
Loading…
Reference in New Issue