Fix pre-6.0 response to unknown replication actions
When sending replica requests for replication operations, we skip sending the request to pre-6.0 nodes for operations that such nodes would not be aware of (e.g., the background global checkpoint sync, or the primary/replica resync) since they would not know what to do with these requests. Yet, we simulate that we received responses from these nodes. Today, this is done by simulating that they sent us that their local checkpoint is unassigned sequence number. However, for pre-6.0 nodes we have introduced a special local checkpoint used in the global checkpoint tracker for such nodes and that is what we should use here too. This commit fixes this issue. Relates #25744
This commit is contained in:
parent
2da79f2b5e
commit
f121cd3beb
|
@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
|
||||||
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||||
super.sendReplicaRequest(replicaRequest, node, listener);
|
super.sendReplicaRequest(replicaRequest, node, listener);
|
||||||
} else {
|
} else {
|
||||||
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
|
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1023,6 +1023,12 @@ public abstract class TransportReplicationAction<
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReplicaResponse(long localCheckpoint) {
|
public ReplicaResponse(long localCheckpoint) {
|
||||||
|
/*
|
||||||
|
* A replica should always know its own local checkpoint so this should always be a valid sequence number or the pre-6.0 local
|
||||||
|
* checkpoint value when simulating responses to replication actions that pre-6.0 nodes are not aware of (e.g., the global
|
||||||
|
* checkpoint background sync, and the primary/replica resync).
|
||||||
|
*/
|
||||||
|
assert localCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO;
|
||||||
this.localCheckpoint = localCheckpoint;
|
this.localCheckpoint = localCheckpoint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -89,7 +89,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
|
||||||
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
|
||||||
super.sendReplicaRequest(replicaRequest, node, listener);
|
super.sendReplicaRequest(replicaRequest, node, listener);
|
||||||
} else {
|
} else {
|
||||||
listener.onResponse(new ReplicaResponse(SequenceNumbersService.UNASSIGNED_SEQ_NO));
|
listener.onResponse(new ReplicaResponse(SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -450,6 +450,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
|
||||||
assert lcps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT ||
|
assert lcps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT ||
|
||||||
localCheckpoint == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT :
|
localCheckpoint == SequenceNumbersService.PRE_60_NODE_LOCAL_CHECKPOINT :
|
||||||
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
|
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
|
||||||
|
// a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
|
||||||
|
assert localCheckpoint != SequenceNumbersService.UNASSIGNED_SEQ_NO :
|
||||||
|
"invalid local checkpoint for shard copy [" + allocationId + "]";
|
||||||
if (localCheckpoint > lcps.localCheckpoint) {
|
if (localCheckpoint > lcps.localCheckpoint) {
|
||||||
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, lcps.localCheckpoint, localCheckpoint);
|
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, lcps.localCheckpoint, localCheckpoint);
|
||||||
lcps.localCheckpoint = localCheckpoint;
|
lcps.localCheckpoint = localCheckpoint;
|
||||||
|
|
|
@ -737,7 +737,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase {
|
||||||
private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) {
|
private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) {
|
||||||
String allocationId = randomFrom(gcp.localCheckpoints.keySet());
|
String allocationId = randomFrom(gcp.localCheckpoints.keySet());
|
||||||
long currentLocalCheckpoint = gcp.localCheckpoints.get(allocationId).getLocalCheckpoint();
|
long currentLocalCheckpoint = gcp.localCheckpoints.get(allocationId).getLocalCheckpoint();
|
||||||
gcp.updateLocalCheckpoint(allocationId, currentLocalCheckpoint + randomInt(5));
|
gcp.updateLocalCheckpoint(allocationId, Math.max(SequenceNumbersService.NO_OPS_PERFORMED, currentLocalCheckpoint + randomInt(5)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void randomMarkInSync(GlobalCheckpointTracker gcp) {
|
private static void randomMarkInSync(GlobalCheckpointTracker gcp) {
|
||||||
|
|
Loading…
Reference in New Issue