Remove PRE_60_NODE_CHECKPOINT ()

This commit removes the obsolete `PRE_60_NODE_CHECKPOINT` constant for dealing
with 5.x nodes' lack of sequence number support.

Backport of 
This commit is contained in:
David Turner 2019-05-28 12:25:53 +01:00 committed by GitHub
parent 00d665540a
commit 746a2f41fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 74 additions and 156 deletions
server/src
test/framework/src/main/java/org/elasticsearch/index
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine

@ -981,28 +981,15 @@ public abstract class TransportReplicationAction<
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
localCheckpoint = in.readZLong();
} else {
// 5.x used to read empty responses, which don't really read anything off the stream, so just do nothing.
localCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
if (in.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
globalCheckpoint = in.readZLong();
} else {
globalCheckpoint = SequenceNumbers.PRE_60_NODE_CHECKPOINT;
}
localCheckpoint = in.readZLong();
globalCheckpoint = in.readZLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
out.writeZLong(localCheckpoint);
}
if (out.getVersion().onOrAfter(Version.V_6_0_0_rc1)) {
out.writeZLong(globalCheckpoint);
}
out.writeZLong(localCheckpoint);
out.writeZLong(globalCheckpoint);
}
@Override

@ -550,9 +550,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
"checkpoints map should always have an entry for the current shard";
// local checkpoints only set during primary mode
assert primaryMode || checkpoints.values().stream()
.allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO ||
lcps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT);
assert primaryMode || checkpoints.values().stream().allMatch(lcps -> lcps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
// global checkpoints for other shards only set during primary mode
assert primaryMode
@ -561,9 +559,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
.stream()
.filter(e -> e.getKey().equals(shardAllocationId) == false)
.map(Map.Entry::getValue)
.allMatch(cps ->
(cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO
|| cps.globalCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT));
.allMatch(cps -> cps.globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO);
// relocation handoff can only occur in primary mode
assert !handoffInProgress || primaryMode;
@ -642,7 +638,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
.stream()
.filter(cps -> cps.inSync)
.mapToLong(function)
.filter(v -> v != SequenceNumbers.PRE_60_NODE_CHECKPOINT && v != SequenceNumbers.UNASSIGNED_SEQ_NO));
.filter(v -> v != SequenceNumbers.UNASSIGNED_SEQ_NO));
return value.isPresent() ? value.getAsLong() : SequenceNumbers.UNASSIGNED_SEQ_NO;
}
@ -789,14 +785,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
/**
* Notifies the tracker of the current allocation IDs in the cluster state.
*
* @param applyingClusterStateVersion the cluster state version being applied when updating the allocation IDs from the master
* @param inSyncAllocationIds the allocation IDs of the currently in-sync shard copies
* @param routingTable the shard routing table
* @param pre60AllocationIds the allocation IDs of shards that are allocated to pre-6.0 nodes
*/
public synchronized void updateFromMaster(final long applyingClusterStateVersion, final Set<String> inSyncAllocationIds,
final IndexShardRoutingTable routingTable, final Set<String> pre60AllocationIds) {
final IndexShardRoutingTable routingTable) {
assert invariant();
if (applyingClusterStateVersion > appliedClusterStateVersion) {
// check that the master does not fabricate new in-sync entries out of thin air once we are in primary mode
@ -817,8 +811,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final boolean inSync = inSyncAllocationIds.contains(initializingId);
assert inSync == false : "update from master in primary mode has " + initializingId +
" as in-sync but it does not exist locally";
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
}
@ -829,8 +822,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
} else {
for (String initializingId : initializingAllocationIds) {
if (shardAllocationId.equals(initializingId) == false) {
final long localCheckpoint = pre60AllocationIds.contains(initializingId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false));
}
@ -842,8 +834,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
checkpointState.inSync = true;
checkpointState.tracked = true;
} else {
final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ?
SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO;
final long localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
final long globalCheckpoint = localCheckpoint;
checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true));
}
@ -931,13 +922,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
}
private boolean updateLocalCheckpoint(String allocationId, CheckpointState cps, long localCheckpoint) {
// a local checkpoint of PRE_60_NODE_CHECKPOINT cannot be overridden
assert cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT ||
localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT :
"pre-6.0 shard copy " + allocationId + " unexpected to send valid local checkpoint " + localCheckpoint;
// a local checkpoint for a shard copy should be a valid sequence number or the pre-6.0 sequence number indicator
assert localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO :
"invalid local checkpoint for shard copy [" + allocationId + "]";
// a local checkpoint for a shard copy should be a valid sequence number
assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED :
"invalid local checkpoint [" + localCheckpoint + "] for shard copy [" + allocationId + "]";
if (localCheckpoint > cps.localCheckpoint) {
logger.trace("updated local checkpoint of [{}] from [{}] to [{}]", allocationId, cps.localCheckpoint, localCheckpoint);
cps.localCheckpoint = localCheckpoint;
@ -996,8 +983,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
if (cps.localCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
// unassigned in-sync replica
return fallback;
} else if (cps.localCheckpoint == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
// 5.x replica, ignore for global checkpoint calculation
} else {
minLocalCheckpoint = Math.min(cps.localCheckpoint, minLocalCheckpoint);
}
@ -1069,18 +1054,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
handoffInProgress = false;
relocated = true;
// forget all checkpoint information except for global checkpoint of current shard
checkpoints.entrySet().stream().forEach(e -> {
final CheckpointState cps = e.getValue();
if (cps.localCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.localCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
if (e.getKey().equals(shardAllocationId) == false) {
checkpoints.forEach((key, cps) -> {
cps.localCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
if (key.equals(shardAllocationId) == false) {
// don't throw global checkpoint information of current shard away
if (cps.globalCheckpoint != SequenceNumbers.UNASSIGNED_SEQ_NO &&
cps.globalCheckpoint != SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
cps.globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
}
});
assert invariant();
@ -1117,17 +1095,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
assert primaryMode == false;
final long lastAppliedClusterStateVersion = appliedClusterStateVersion;
final Set<String> inSyncAllocationIds = new HashSet<>();
final Set<String> pre60AllocationIds = new HashSet<>();
checkpoints.entrySet().forEach(entry -> {
if (entry.getValue().inSync) {
inSyncAllocationIds.add(entry.getKey());
}
if (entry.getValue().getLocalCheckpoint() == SequenceNumbers.PRE_60_NODE_CHECKPOINT) {
pre60AllocationIds.add(entry.getKey());
}
});
final IndexShardRoutingTable lastAppliedRoutingTable = routingTable;
return () -> updateFromMaster(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable, pre60AllocationIds);
return () -> updateFromMaster(lastAppliedClusterStateVersion, inSyncAllocationIds, lastAppliedRoutingTable);
}
/**

@ -28,10 +28,6 @@ public class SequenceNumbers {
public static final String LOCAL_CHECKPOINT_KEY = "local_checkpoint";
public static final String MAX_SEQ_NO = "max_seq_no";
/**
* Represents a checkpoint coming from a pre-6.0 node
*/
public static final long PRE_60_NODE_CHECKPOINT = -3L;
/**
* Represents an unassigned sequence number (e.g., can be used on primary operations before they are executed).
*/

@ -433,8 +433,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
final long applyingClusterStateVersion,
final Set<String> inSyncAllocationIds,
final IndexShardRoutingTable routingTable,
final Set<String> pre60AllocationIds) throws IOException {
final IndexShardRoutingTable routingTable) throws IOException {
final ShardRouting currentRouting;
synchronized (mutex) {
currentRouting = this.shardRouting;
@ -453,7 +452,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
if (newRouting.primary()) {
replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds);
replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable);
}
if (state == IndexShardState.POST_RECOVERY && newRouting.active()) {

@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -35,7 +34,6 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.AllocationId;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource.Type;
import org.elasticsearch.cluster.routing.RoutingNode;
@ -94,8 +92,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.CLOSED;
import static org.elasticsearch.indices.cluster.IndicesClusterStateService.AllocatedIndices.IndexRemovalReason.DELETED;
@ -630,21 +626,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
primaryTerm = indexMetaData.primaryTerm(shard.shardId().id());
final Set<String> inSyncIds = indexMetaData.inSyncAllocationIds(shard.shardId().id());
final IndexShardRoutingTable indexShardRoutingTable = routingTable.shardRoutingTable(shardRouting.shardId());
final Set<String> pre60AllocationIds = indexShardRoutingTable.assignedShards()
.stream()
.flatMap(shr -> {
if (shr.relocating()) {
return Stream.of(shr, shr.getTargetRelocatingShard());
} else {
return Stream.of(shr);
}
})
.filter(shr -> nodes.get(shr.currentNodeId()).getVersion().before(Version.V_6_0_0_alpha1))
.map(ShardRouting::allocationId)
.map(AllocationId::getId)
.collect(Collectors.toSet());
shard.updateShardState(shardRouting, primaryTerm, primaryReplicaSyncer::resync, clusterState.version(),
inSyncIds, indexShardRoutingTable, pre60AllocationIds);
inSyncIds, indexShardRoutingTable);
} catch (Exception e) {
failAndRemoveShard(shardRouting, true, "failed updating shard routing entry", e, clusterState);
return;
@ -810,7 +793,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
* - Updates and persists the new routing value.
* - Updates the primary term if this shard is a primary.
* - Updates the allocation ids that are tracked by the shard if it is a primary.
* See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details.
* See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable)} for details.
*
* @param shardRouting the new routing entry
* @param primaryTerm the new primary term
@ -826,8 +809,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
long applyingClusterStateVersion,
Set<String> inSyncAllocationIds,
IndexShardRoutingTable routingTable,
Set<String> pre60AllocationIds) throws IOException;
IndexShardRoutingTable routingTable) throws IOException;
}
public interface AllocatedIndex<T extends Shard> extends Iterable<T>, IndexComponent {

@ -2281,7 +2281,7 @@ public class InternalEngineTests extends EngineTestCase {
ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier();
gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(),
replica.allocationId().getId())),
new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build());
gcpTracker.activatePrimaryMode(primarySeqNo);
for (int op = 0; op < opCount; op++) {
final String id;

@ -75,7 +75,7 @@ public class NoOpEngineTests extends EngineTestCase {
ShardRouting routing = TestShardRouting.newShardRouting("test", shardId.id(), "node",
null, true, ShardRoutingState.STARTED, allocationId);
IndexShardRoutingTable table = new IndexShardRoutingTable.Builder(shardId).addShard(routing).build();
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table, Collections.emptySet());
tracker.updateFromMaster(1L, Collections.singleton(allocationId.getId()), table);
tracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
for (int i = 0; i < docs; i++) {
ParsedDocument doc = testParsedDocument("" + i, null, testDocumentWithTextField(), B_1, null);

@ -71,8 +71,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
@ -113,8 +112,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final String id = randomAlphaOfLength(8);
final long retainingSequenceNumber = randomNonNegativeLong();
@ -142,8 +140,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final String id = randomAlphaOfLength(8);
final RetentionLeaseNotFoundException e = expectThrows(
@ -179,8 +176,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
@ -214,8 +210,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
final long[] minimumRetainingSequenceNumbers = new long[length];
@ -265,8 +260,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final String id = randomAlphaOfLength(8);
final RetentionLeaseNotFoundException e = expectThrows(
@ -302,8 +296,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
@ -354,8 +347,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
if (primaryMode) {
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
}
@ -427,8 +419,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
final int length = randomIntBetween(0, 8);
final List<RetentionLeases> retentionLeasesCollection = new ArrayList<>(length);
long primaryTerm = 1;
@ -481,8 +472,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
@ -515,8 +505,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {
@ -564,8 +553,7 @@ public class ReplicationTrackerRetentionLeaseTests extends ReplicationTrackerTes
replicationTracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(allocationId.getId()),
routingTable(Collections.emptySet(), allocationId),
Collections.emptySet());
routingTable(Collections.emptySet(), allocationId));
replicationTracker.activatePrimaryMode(SequenceNumbers.NO_OPS_PERFORMED);
final int length = randomIntBetween(0, 8);
for (int i = 0; i < length; i++) {

@ -120,7 +120,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
logger.info(" - [{}], local checkpoint [{}], [{}]", aId, allocations.get(aId), type);
});
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1));
initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
@ -147,7 +147,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
Set<AllocationId> newInitializing = new HashSet<>(initializing);
newInitializing.add(extraId);
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(active), routingTable(newInitializing, primaryId), emptySet());
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(active), routingTable(newInitializing, primaryId));
tracker.initiateTracking(extraId.getId());
@ -187,7 +187,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final AllocationId primaryId = active.iterator().next();
final AllocationId replicaId = initializing.iterator().next();
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId));
final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1);
tracker.activatePrimaryMode(localCheckpoint);
tracker.initiateTracking(replicaId.getId());
@ -229,7 +229,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
assigned.putAll(initializing);
AllocationId primaryId = active.keySet().iterator().next();
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(initializing.keySet()).forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
final AllocationId missingActiveID = randomFrom(active.keySet());
@ -256,7 +256,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
AllocationId primaryId = active.keySet().iterator().next();
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
randomSubsetOf(randomIntBetween(1, initializing.size() - 1),
initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
@ -278,7 +278,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final Map<AllocationId, Long> nonApproved = randomAllocationsWithLocalCheckpoints(1, 5);
final AllocationId primaryId = active.keySet().iterator().next();
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet());
tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
initializing.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
nonApproved.keySet().forEach(k ->
@ -313,7 +313,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
allocations.putAll(initializingToBeRemoved);
}
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
if (randomBoolean()) {
initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
@ -329,16 +329,14 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
tracker.updateFromMaster(
initialClusterStateVersion + 1,
ids(activeToStay.keySet()),
routingTable(initializingToStay.keySet(), primaryId),
emptySet());
routingTable(initializingToStay.keySet(), primaryId));
allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L));
tracker.updateFromMaster(
initialClusterStateVersion + 2,
ids(activeToStay.keySet()),
routingTable(initializingToStay.keySet(), primaryId),
emptySet());
routingTable(initializingToStay.keySet(), primaryId));
}
final long checkpoint = Stream.concat(activeToStay.values().stream(), initializingToStay.values().stream())
@ -357,7 +355,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final ReplicationTracker tracker = newTracker(inSyncAllocationId);
final long clusterStateVersion = randomNonNegativeLong();
tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId));
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@ -397,7 +395,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
} else {
// master changes its mind and cancels the allocation
tracker.updateFromMaster(clusterStateVersion + 1, Collections.singleton(inSyncAllocationId.getId()),
routingTable(emptySet(), inSyncAllocationId), emptySet());
routingTable(emptySet(), inSyncAllocationId));
barrier.await();
assertTrue(complete.get());
assertNull(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()));
@ -421,7 +419,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final ReplicationTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId));
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
try {
@ -470,7 +468,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
AllocationId primaryId = activeAllocationIds.iterator().next();
IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId);
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet());
tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable);
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds)));
assertThat(tracker.getReplicationGroup().getRoutingTable(), equalTo(routingTable));
@ -500,7 +498,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final Set<AllocationId> newInitializingAllocationIds =
initializingIds.stream().filter(a -> !removingInitializingAllocationIds.contains(a)).collect(Collectors.toSet());
routingTable = routingTable(newInitializingAllocationIds, primaryId);
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(newActiveAllocationIds), routingTable, emptySet());
tracker.updateFromMaster(initialClusterStateVersion + 1, ids(newActiveAllocationIds), routingTable);
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
assertTrue(removingActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()) == null));
assertTrue(newInitializingAllocationIds.stream().noneMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
@ -517,8 +515,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
tracker.updateFromMaster(
initialClusterStateVersion + 2,
ids(newActiveAllocationIds),
routingTable(newInitializingAllocationIds, primaryId),
emptySet());
routingTable(newInitializingAllocationIds, primaryId));
assertTrue(newActiveAllocationIds.stream().allMatch(a -> tracker.getTrackedLocalCheckpointForShard(a.getId()).inSync));
assertTrue(
newActiveAllocationIds
@ -565,8 +562,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
tracker.updateFromMaster(
initialClusterStateVersion + 3,
ids(newActiveAllocationIds),
routingTable(newInitializingAllocationIds, primaryId),
emptySet());
routingTable(newInitializingAllocationIds, primaryId));
final CyclicBarrier barrier = new CyclicBarrier(2);
final Thread thread = new Thread(() -> {
try {
@ -604,8 +600,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
tracker.updateFromMaster(
initialClusterStateVersion + 4,
ids(newActiveAllocationIds),
routingTable(newInitializingAllocationIds, primaryId),
emptySet());
routingTable(newInitializingAllocationIds, primaryId));
assertTrue(tracker.getTrackedLocalCheckpointForShard(newSyncingAllocationId.getId()).inSync);
assertFalse(tracker.pendingInSync.contains(newSyncingAllocationId.getId()));
}
@ -633,8 +628,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
tracker.updateFromMaster(
randomNonNegativeLong(),
Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing), active),
emptySet());
routingTable(Collections.singleton(initializing), active));
tracker.activatePrimaryMode(activeLocalCheckpoint);
final int nextActiveLocalCheckpoint = randomIntBetween(activeLocalCheckpoint + 1, Integer.MAX_VALUE);
final Thread activeThread = new Thread(() -> {
@ -835,7 +829,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
final AllocationId initializing = AllocationId.newInitializing();
final ReplicationTracker tracker = newTracker(active);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()),
routingTable(Collections.singleton(initializing), active), emptySet());
routingTable(Collections.singleton(initializing), active));
tracker.activatePrimaryMode(NO_OPS_PERFORMED);
expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(randomAlphaOfLength(10)));
@ -863,7 +857,7 @@ public class ReplicationTrackerTests extends ReplicationTrackerTestCase {
}
public void apply(ReplicationTracker gcp) {
gcp.updateFromMaster(version, ids(inSyncIds), routingTable, Collections.emptySet());
gcp.updateFromMaster(version, ids(inSyncIds), routingTable);
}
}

@ -603,7 +603,7 @@ public class IndexShardTests extends IndexShardTestCase {
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build());
/*
* This operation completing means that the delay operation executed as part of increasing the primary term has completed and the
@ -656,8 +656,8 @@ public class IndexShardTests extends IndexShardTestCase {
latch.countDown();
}, 0L,
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build()
);
latch.await();
assertThat(indexShard.getActiveOperationsCount(), isOneOf(0, IndexShard.OPERATIONS_BLOCKED));
if (randomBoolean()) {
@ -1208,8 +1208,7 @@ public class IndexShardTests extends IndexShardTestCase {
(s, r) -> resyncLatch.countDown(),
1L,
Collections.singleton(newRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build(),
Collections.emptySet());
new IndexShardRoutingTable.Builder(newRouting.shardId()).addShard(newRouting).build());
resyncLatch.await();
assertThat(indexShard.getLocalCheckpoint(), equalTo(maxSeqNo));
assertThat(indexShard.seqNoStats().getMaxSeqNo(), equalTo(maxSeqNo));

@ -99,7 +99,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateShardState(shard.routingEntry(), shard.getPendingPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build());
shard.updateLocalCheckpointForShard(allocationId, globalCheckPoint);
assertEquals(globalCheckPoint, shard.getGlobalCheckpoint());
@ -159,7 +159,7 @@ public class PrimaryReplicaSyncerTests extends IndexShardTestCase {
String allocationId = shard.routingEntry().allocationId().getId();
shard.updateShardState(shard.routingEntry(), shard.getPendingPrimaryTerm(), null, 1000L, Collections.singleton(allocationId),
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(shard.shardId()).addShard(shard.routingEntry()).build());
CountDownLatch syncCalledLatch = new CountDownLatch(1);
PlainActionFuture<PrimaryReplicaSyncer.ResyncTask> fut = new PlainActionFuture<PrimaryReplicaSyncer.ResyncTask>() {

@ -355,8 +355,7 @@ public abstract class AbstractIndicesClusterStateServiceTestCase extends ESTestC
BiConsumer<IndexShard, ActionListener<ResyncTask>> primaryReplicaSyncer,
long applyingClusterStateVersion,
Set<String> inSyncAllocationIds,
IndexShardRoutingTable routingTable,
Set<String> pre60AllocationIds) throws IOException {
IndexShardRoutingTable routingTable) throws IOException {
failRandomly();
assertThat(this.shardId(), equalTo(shardRouting.shardId()));
assertTrue("current: " + this.shardRouting + ", got: " + shardRouting, this.shardRouting.isSameAllocation(shardRouting));

@ -293,7 +293,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
ShardRouting startedRoutingEntry = ShardRoutingHelper.moveToStarted(primary.routingEntry());
IndexShardRoutingTable routingTable = routingTable(shr -> shr == primary.routingEntry() ? startedRoutingEntry : shr);
primary.updateShardState(startedRoutingEntry, primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), activeIds, routingTable, Collections.emptySet());
currentClusterStateVersion.incrementAndGet(), activeIds, routingTable);
for (final IndexShard replica : replicas) {
recoverReplica(replica);
}
@ -385,7 +385,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
IndexShardRoutingTable routingTable = routingTable(shr -> shr == replica.routingEntry() ? primaryRouting : shr);
primary.updateShardState(primaryRouting, newTerm, primaryReplicaSyncer, currentClusterStateVersion.incrementAndGet(),
activeIds(), routingTable, Collections.emptySet());
activeIds(), routingTable);
}
private synchronized Set<String> activeIds() {
@ -520,7 +520,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(),
activeIds(), routingTable(Function.identity()), Collections.emptySet());
activeIds(), routingTable(Function.identity()));
}
private synchronized void computeReplicationTargets() {

@ -548,7 +548,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
.addShard(shardRouting)
.build();
shard.updateShardState(shardRouting, shard.getPendingPrimaryTerm(), null, currentClusterStateVersion.incrementAndGet(),
inSyncIds, newRoutingTable, Collections.emptySet());
inSyncIds, newRoutingTable);
}
protected void recoveryEmptyReplica(IndexShard replica, boolean startReplica) throws IOException {
@ -633,7 +633,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
new AsyncRecoveryTarget(recoveryTarget, threadPool.generic()),
request, Math.toIntExact(ByteSizeUnit.MB.toBytes(1)), between(1, 8));
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable, Collections.emptySet());
currentClusterStateVersion.incrementAndGet(), inSyncIds, routingTable);
PlainActionFuture<RecoveryResponse> future = new PlainActionFuture<>();
recovery.recoverToTarget(future);
@ -658,9 +658,9 @@ public abstract class IndexShardTestCase extends ESTestCase {
inSyncIdsWithReplica.add(replica.routingEntry().allocationId().getId());
// update both primary and replica shard state
primary.updateShardState(primary.routingEntry(), primary.getPendingPrimaryTerm(), null,
currentClusterStateVersion.incrementAndGet(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
currentClusterStateVersion.incrementAndGet(), inSyncIdsWithReplica, newRoutingTable);
replica.updateShardState(replica.routingEntry().moveToStarted(), replica.getPendingPrimaryTerm(), null,
currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable, Collections.emptySet());
currentClusterStateVersion.get(), inSyncIdsWithReplica, newRoutingTable);
}
@ -685,7 +685,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
(is, listener) ->
listener.onResponse(new PrimaryReplicaSyncer.ResyncTask(1, "type", "action", "desc", null, Collections.emptyMap())),
currentClusterStateVersion.incrementAndGet(),
inSyncIds, newRoutingTable, Collections.emptySet());
inSyncIds, newRoutingTable);
}
private Store.MetadataSnapshot getMetadataSnapshotOrEmpty(IndexShard replica) throws IOException {

@ -80,7 +80,7 @@ public class FollowEngineIndexShardTests extends IndexShardTestCase {
replicaRouting.allocationId());
indexShard.updateShardState(primaryRouting, indexShard.getOperationPrimaryTerm() + 1, (shard, listener) -> {},
0L, Collections.singleton(primaryRouting.allocationId().getId()),
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build(), Collections.emptySet());
new IndexShardRoutingTable.Builder(primaryRouting.shardId()).addShard(primaryRouting).build());
final CountDownLatch latch = new CountDownLatch(1);
ActionListener<Releasable> actionListener = ActionListener.wrap(releasable -> {