Move pre-6.0 node checkpoint to SequenceNumbers

This commit moves the pre-6.0 node checkpoint constant from
SequenceNumbersService to SequenceNumbers so it can chill with the other
sequence number-related constants.

Relates #26690
This commit is contained in:
Jason Tedor 2017-09-19 06:27:56 -04:00 committed by GitHub
parent 2db3bccd37
commit 256721018b
6 changed files with 21 additions and 23 deletions

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.index.translog.Translog;
@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction<Resyn
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

View File

@ -54,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.seqno.SequenceNumbersService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ReplicationGroup;
@ -1055,12 +1054,12 @@ public abstract class TransportReplicationAction<
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
localCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
globalCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
}

View File

@ -89,7 +89,7 @@ public class GlobalCheckpointSyncAction extends TransportReplicationAction<
if (node.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
super.sendReplicaRequest(replicaRequest, node, listener);
} else {
final long pre60NodeCheckpoint = SequenceNumbersService.PRE_60_NODE_CHECKPOINT;
final long pre60NodeCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
listener.onResponse(new ReplicaResponse(pre60NodeCheckpoint, pre60NodeCheckpoint));
}
}

View File

@ -230,7 +230,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
// local checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream()
.allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO ||
lcps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT);
lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT);
// global checkpoints for other shards only set during primary mode
assert primaryMode
@ -241,7 +241,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
.map(Map.Entry::getValue)
.allMatch(cps ->
(cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO
|| cps.globalCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT));
|| cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT));
// relocation handoff can only occur in primary mode
assert !handoffInProgress || primaryMode;
@ -316,7 +316,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
.stream()
.filter(cps -> cps.inSync)
.mapToLong(function)
.filter(v -> v != SequenceNumbersService.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
.filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}
@ -473,7 +473,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
assert inSync == false : "update from master in primary mode has " + initializingId +
" as in-sync but it does not exist locally";
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync));
}
@ -482,7 +482,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
for (String initializingId : initializingAllocationIds) {
if (shardAllocationId.equals(initializingId) == false) {
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false));
}
@ -493,7 +493,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
checkpoints.get(shardAllocationId).inSync = true;
} else {
final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true));
}
@ -576,8 +576,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
// a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden
assert cps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT ||
localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT :
assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT ||
localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT :
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
// a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
@ -640,7 +640,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// unassigned in-sync replica
return fallback;
} else if (cps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT) {
} else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
// 5.x replica, ignore for global checkpoint calculation
} else {
minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
@ -713,13 +713,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
checkpoints.entrySet().stream().forEach(e -> {
final CheckpointState cps = e.getValue();
if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT) {
cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (e.getKey().equals(shardAllocationId) == false) {
// don't throw global checkpoint information of current shard away
if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.globalCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT) {
cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
}
@ -763,7 +763,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent {
if (entry.getValue().inSync) {
inSyncAllocationIds.add(entry.getKey());
}
if (entry.getValue().getLocalCheckpoint() == SequenceNumbersService.PRE_60_NODE_CHECKPOINT) {
if (entry.getValue().getLocalCheckpoint() == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
pre60AllocationIds.add(entry.getKey());
}
});

View File

@ -28,6 +28,10 @@ public class SequenceNumbers {
public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
public static final String MAX_SEQ_NO = "max_seq_no";
/**
* Represents a checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_CHECKPOINT = -3L;
/**
* Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed).
*/

View File

@ -33,11 +33,6 @@ import java.util.Set;
*/
public class SequenceNumbersService extends AbstractIndexShardComponent {
/**
* Represents a local checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_CHECKPOINT = -3L;
private final LocalCheckpointTracker localCheckpointTracker;
private final GlobalCheckpointTracker globalCheckpointTracker;