From 256721018bd3a8be4a069646473e0bf0c7c84bac Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 19 Sep 2017 06:27:56 -0400 Subject: [PATCH] Move pre-6.0 node checkpoint to SequenceNumbers This commit moves the pre-6.0 node checkpoint constant from SequenceNumbersService to SequenceNumbers so it can chill with the other sequence number-related constants. Relates #26690 --- .../TransportResyncReplicationAction.java | 4 ++-- .../TransportReplicationAction.java | 5 ++-- .../seqno/GlobalCheckpointSyncAction.java | 2 +- .../index/seqno/GlobalCheckpointTracker.java | 24 +++++++++---------- .../index/seqno/SequenceNumbers.java | 4 ++++ .../index/seqno/SequenceNumbersService.java | 5 ---- 6 files changed, 21 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 514cbca04cc..d217717faeb 100644 --- a/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.seqno.SequenceNumbersService; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; import org.elasticsearch.index.translog.Translog; @@ -93,7 +93,7 @@ public class TransportResyncReplicationAction extends TransportWriteAction lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO || - lcps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT); + lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT); // global checkpoints for other shards only set during primary mode assert primaryMode @@ -241,7 +241,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { .map(Map.Entry::getValue) .allMatch(cps -> (cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO - || cps.globalCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT)); + || cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT)); // relocation handoff can only occur in primary mode assert !handoffInProgress || primaryMode; @@ -316,7 +316,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { .stream() .filter(cps -> cps.inSync) .mapToLong(function) - .filter(v -> v != SequenceNumbersService.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO)); + .filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO)); return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO; } @@ -473,7 +473,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { assert inSync == false : "update from master in primary mode has " + initializingId + " as in-sync but it does not exist locally"; final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? - SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; + SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync)); } @@ -482,7 +482,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { for (String initializingId : initializingAllocationIds) { if (shardAllocationId.equals(initializingId) == false) { final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? - SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; + SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false)); } @@ -493,7 +493,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { checkpoints.get(shardAllocationId).inSync = true; } else { final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ? - SequenceNumbersService.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; + SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true)); } @@ -576,8 +576,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) { // a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden - assert cps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT || - localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT : + assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT || + localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT : "pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint; // a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO : @@ -640,7 +640,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) { // unassigned in-sync replica return fallback; - } else if (cps.localCheckpoint == SequenceNumbersService.PRE_60_NODE_CHECKPOINT) { + } else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) { // 5.x replica, ignore for global checkpoint calculation } else { minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint); @@ -713,13 +713,13 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { checkpoints.entrySet().stream().forEach(e -> { final CheckpointState cps = e.getValue(); if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && - cps.localCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT) { + cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) { cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; } if (e.getKey().equals(shardAllocationId) == false) { // don't throw global checkpoint information of current shard away if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO && - cps.globalCheckpoint != SequenceNumbersService.PRE_60_NODE_CHECKPOINT) { + cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) { cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; } } @@ -763,7 +763,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent { if (entry.getValue().inSync) { inSyncAllocationIds.add(entry.getKey()); } - if (entry.getValue().getLocalCheckpoint() == SequenceNumbersService.PRE_60_NODE_CHECKPOINT) { + if (entry.getValue().getLocalCheckpoint() == SequenceNumbers.PRE_60_NODE_CHECKPOINT) { pre60AllocationIds.add(entry.getKey()); } }); diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java index cf878f613a7..21b4134f983 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbers.java @@ -28,6 +28,10 @@ public class SequenceNumbers { public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint"; public static final String MAX_SEQ_NO = "max_seq_no"; + /** + * Represents a checkpoint coming from a pre-6.0 node + */ + public static final long PRE_60_NODE_CHECKPOINT = -3L; /** * Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed). */ diff --git a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java index fa0d0bc9b34..760fbe0a5fc 100644 --- a/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java +++ b/core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java @@ -33,11 +33,6 @@ import java.util.Set; */ public class SequenceNumbersService extends AbstractIndexShardComponent { - /** - * Represents a local checkpoint coming from a pre-6.0 node - */ - public static final long PRE_60_NODE_CHECKPOINT = -3L; - private final LocalCheckpointTracker localCheckpointTracker; private final GlobalCheckpointTracker globalCheckpointTracker;