CCR: Use single global checkpoint to normalize range (#33545)
We may use different global checkpoints to validate/normalize the range of a change request if the global checkpoint is advanced between these calls. If this is the case, then we generate an invalid request range.
This commit is contained in:
parent
d4b212c4c9
commit
902d20cbbe
|
@ -297,12 +297,13 @@ public class ShardChangesAction extends Action<ShardChangesAction.Response> {
|
||||||
if (indexShard.state() != IndexShardState.STARTED) {
|
if (indexShard.state() != IndexShardState.STARTED) {
|
||||||
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
|
throw new IndexShardNotStartedException(indexShard.shardId(), indexShard.state());
|
||||||
}
|
}
|
||||||
if (fromSeqNo > indexShard.getGlobalCheckpoint()) {
|
if (fromSeqNo > globalCheckpoint) {
|
||||||
return EMPTY_OPERATIONS_ARRAY;
|
return EMPTY_OPERATIONS_ARRAY;
|
||||||
}
|
}
|
||||||
int seenBytes = 0;
|
int seenBytes = 0;
|
||||||
// - 1 is needed, because toSeqNo is inclusive
|
// - 1 is needed, because toSeqNo is inclusive
|
||||||
long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1);
|
long toSeqNo = Math.min(globalCheckpoint, (fromSeqNo + maxOperationCount) - 1);
|
||||||
|
assert fromSeqNo <= toSeqNo : "invalid range from_seqno[" + fromSeqNo + "] > to_seqno[" + toSeqNo + "]";
|
||||||
final List<Translog.Operation> operations = new ArrayList<>();
|
final List<Translog.Operation> operations = new ArrayList<>();
|
||||||
try (Translog.Snapshot snapshot = indexShard.newChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
|
try (Translog.Snapshot snapshot = indexShard.newChangesSnapshot("ccr", fromSeqNo, toSeqNo, true)) {
|
||||||
Translog.Operation op;
|
Translog.Operation op;
|
||||||
|
|
Loading…
Reference in New Issue