diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 660081c5cfb..817be12a8e2 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -117,19 +117,17 @@ public class ReplicationOperation< // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. final long globalCheckpoint = primary.globalCheckpoint(); final ReplicationGroup replicationGroup = primary.getReplicationGroup(); - markUnavailableShardsAsStale(replicaRequest, replicationGroup.getInSyncAllocationIds(), replicationGroup.getRoutingTable()); - performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup.getRoutingTable()); + markUnavailableShardsAsStale(replicaRequest, replicationGroup); + performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup); } successfulShards.incrementAndGet(); // mark primary as successful decPendingAndFinishIfNeeded(); } - private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Set inSyncAllocationIds, - IndexShardRoutingTable indexShardRoutingTable) { + private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) { // if inSyncAllocationIds contains allocation ids of shards that don't exist in RoutingTable, mark copies as stale - for (String allocationId : Sets.difference(inSyncAllocationIds, indexShardRoutingTable.getAllAllocationIds())) { - // mark copy as stale + for (String allocationId : replicationGroup.getUnavailableInSyncShards()) { pendingActions.incrementAndGet(); replicasProxy.markShardCopyAsStaleIfNeeded(replicaRequest.shardId(), allocationId, ReplicationOperation.this::decPendingAndFinishIfNeeded, @@ -140,23 +138,17 @@ public class ReplicationOperation< } private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint, - final IndexShardRoutingTable indexShardRoutingTable) { - final String localNodeId = primary.routingEntry().currentNodeId(); - // If the index gets deleted after primary operation, we skip replication - for (final ShardRouting shard : indexShardRoutingTable) { - if (shard.unassigned()) { - assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; - totalShards.incrementAndGet(); - continue; - } + final ReplicationGroup replicationGroup) { + // for total stats, add number of unassigned shards and + // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target) + totalShards.addAndGet(replicationGroup.getSkippedShards().size()); - if (shard.currentNodeId().equals(localNodeId) == false) { + final ShardRouting primaryRouting = primary.routingEntry(); + + for (final ShardRouting shard : replicationGroup.getReplicationTargets()) { + if (shard.isSameAllocation(primaryRouting) == false) { performOnReplica(shard, replicaRequest, globalCheckpoint); } - - if (shard.relocating() && shard.relocatingNodeId().equals(localNodeId) == false) { - performOnReplica(shard.getTargetRelocatingShard(), replicaRequest, globalCheckpoint); - } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java similarity index 93% rename from server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java rename to server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index ada7cf7fd0b..43d4c489149 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.seqno; import com.carrotsearch.hppc.ObjectLongHashMap; import com.carrotsearch.hppc.ObjectLongMap; +import org.elasticsearch.Version; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -48,15 +49,17 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; /** - * This class is responsible of tracking the global checkpoint. The global checkpoint is the highest sequence number for which all lower (or - * equal) sequence number have been processed on all shards that are currently active. Since shards count as "active" when the master starts + * This class is responsible for tracking the replication group with its progress and safety markers (local and global checkpoints). + * + * The global checkpoint is the highest sequence number for which all lower (or equal) sequence number have been processed + * on all shards that are currently active. Since shards count as "active" when the master starts * them, and before this primary shard has been notified of this fact, we also include shards that have completed recovery. These shards * have received all old operations via the recovery mechanism and are kept up to date by the various replications actions. The set of * shards that are taken into account for the global checkpoint calculation are called the "in-sync shards". *

* The global checkpoint is maintained by the primary shard and is replicated to all the replicas (via {@link GlobalCheckpointSyncAction}). */ -public class GlobalCheckpointTracker extends AbstractIndexShardComponent implements LongSupplier { +public class ReplicationTracker extends AbstractIndexShardComponent implements LongSupplier { /** * The allocation ID for the shard to which this tracker is a component of. @@ -146,16 +149,32 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme */ boolean inSync; - public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync) { + /** + * whether this shard is tracked in the replication group, i.e., should receive document updates from the primary. + */ + boolean tracked; + + public CheckpointState(long localCheckpoint, long globalCheckpoint, boolean inSync, boolean tracked) { this.localCheckpoint = localCheckpoint; this.globalCheckpoint = globalCheckpoint; this.inSync = inSync; + this.tracked = tracked; } public CheckpointState(StreamInput in) throws IOException { this.localCheckpoint = in.readZLong(); this.globalCheckpoint = in.readZLong(); this.inSync = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_6_3_0)) { + this.tracked = in.readBoolean(); + } else { + // Every in-sync shard copy is also tracked (see invariant). This was the case even in earlier ES versions. + // Non in-sync shard copies might be tracked or not. As this information here is only serialized during relocation hand-off, + // after which replica recoveries cannot complete anymore (i.e. they cannot move from in-sync == false to in-sync == true), + // we can treat non in-sync replica shard copies as untracked. They will go through a fresh recovery against the new + // primary and will become tracked again under this primary before they are marked as in-sync. + this.tracked = inSync; + } } @Override @@ -163,13 +182,16 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme out.writeZLong(localCheckpoint); out.writeZLong(globalCheckpoint); out.writeBoolean(inSync); + if (out.getVersion().onOrAfter(Version.V_6_3_0)) { + out.writeBoolean(tracked); + } } /** * Returns a full copy of this object */ public CheckpointState copy() { - return new CheckpointState(localCheckpoint, globalCheckpoint, inSync); + return new CheckpointState(localCheckpoint, globalCheckpoint, inSync, tracked); } public long getLocalCheckpoint() { @@ -186,6 +208,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme "localCheckpoint=" + localCheckpoint + ", globalCheckpoint=" + globalCheckpoint + ", inSync=" + inSync + + ", tracked=" + tracked + '}'; } @@ -198,7 +221,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme if (localCheckpoint != that.localCheckpoint) return false; if (globalCheckpoint != that.globalCheckpoint) return false; - return inSync == that.inSync; + if (inSync != that.inSync) return false; + return tracked == that.tracked; } @Override @@ -206,6 +230,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme int result = Long.hashCode(localCheckpoint); result = 31 * result + Long.hashCode(globalCheckpoint); result = 31 * result + Boolean.hashCode(inSync); + result = 31 * result + Boolean.hashCode(tracked); return result; } } @@ -301,6 +326,9 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme // blocking global checkpoint advancement only happens for shards that are not in-sync assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync : "shard copy " + entry.getKey() + " blocks global checkpoint advancement but is in-sync"; + // in-sync shard copies are tracked + assert !entry.getValue().inSync || entry.getValue().tracked : + "shard copy " + entry.getKey() + " is in-sync but not tracked"; } return true; @@ -330,7 +358,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme * @param indexSettings the index settings * @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} */ - public GlobalCheckpointTracker( + public ReplicationTracker( final ShardId shardId, final String allocationId, final IndexSettings indexSettings, @@ -342,7 +370,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme this.handoffInProgress = false; this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); - checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false)); + checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; @@ -361,7 +389,8 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme private ReplicationGroup calculateReplicationGroup() { return new ReplicationGroup(routingTable, - checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet())); + checkpoints.entrySet().stream().filter(e -> e.getValue().inSync).map(Map.Entry::getKey).collect(Collectors.toSet()), + checkpoints.entrySet().stream().filter(e -> e.getValue().tracked).map(Map.Entry::getKey).collect(Collectors.toSet())); } /** @@ -481,7 +510,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync)); + checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync)); } } } else { @@ -490,18 +519,20 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme final long localCheckpoint = pre60AllocationIds.contains(initializingId) ? SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false)); + checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, false, false)); } } for (String inSyncId : inSyncAllocationIds) { if (shardAllocationId.equals(inSyncId)) { // current shard is initially marked as not in-sync because we don't know better at that point - checkpoints.get(shardAllocationId).inSync = true; + CheckpointState checkpointState = checkpoints.get(shardAllocationId); + checkpointState.inSync = true; + checkpointState.tracked = true; } else { final long localCheckpoint = pre60AllocationIds.contains(inSyncId) ? SequenceNumbers.PRE_60_NODE_CHECKPOINT : SequenceNumbers.UNASSIGNED_SEQ_NO; final long globalCheckpoint = localCheckpoint; - checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true)); + checkpoints.put(inSyncId, new CheckpointState(localCheckpoint, globalCheckpoint, true, true)); } } } @@ -516,19 +547,22 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme } /** - * Called when the recovery process for a shard is ready to open the engine on the target shard. Ensures that the right data structures - * have been set up locally to track local checkpoint information for the shard. + * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures + * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group. * * @param allocationId the allocation ID of the shard for which recovery was initiated */ public synchronized void initiateTracking(final String allocationId) { assert invariant(); assert primaryMode; + assert handoffInProgress == false; CheckpointState cps = checkpoints.get(allocationId); if (cps == null) { // can happen if replica was removed from cluster but recovery process is unaware of it yet throw new IllegalStateException("no local checkpoint tracking information available"); } + cps.tracked = true; + replicationGroup = calculateReplicationGroup(); assert invariant(); } @@ -551,6 +585,7 @@ public class GlobalCheckpointTracker extends AbstractIndexShardComponent impleme assert localCheckpoint >= SequenceNumbers.NO_OPS_PERFORMED : "expected known local checkpoint for " + allocationId + " but was " + localCheckpoint; assert pendingInSync.contains(allocationId) == false : "shard copy " + allocationId + " is already marked as pending in-sync"; + assert cps.tracked : "shard copy " + allocationId + " cannot be marked as in-sync as it's not tracked"; updateLocalCheckpoint(allocationId, cps, localCheckpoint); // if it was already in-sync (because of a previously failed recovery attempt), global checkpoint must have been // stuck from advancing diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 711fe68bf65..5245da668de 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -106,7 +106,7 @@ import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.PrimaryReplicaSyncer.ResyncTask; @@ -190,7 +190,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final SearchOperationListener searchOperationListener; - private final GlobalCheckpointTracker globalCheckpointTracker; + private final ReplicationTracker replicationTracker; protected volatile ShardRouting shardRouting; protected volatile IndexShardState state; @@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); - this.globalCheckpointTracker = new GlobalCheckpointTracker(shardId, shardRouting.allocationId().getId(), indexSettings, + this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis @@ -402,7 +402,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } if (newRouting.primary()) { - globalCheckpointTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds); + replicationTracker.updateFromMaster(applyingClusterStateVersion, inSyncAllocationIds, routingTable, pre60AllocationIds); } if (state == IndexShardState.POST_RECOVERY && newRouting.active()) { @@ -415,7 +415,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } if (newRouting.primary() && currentRouting.isRelocationTarget() == false) { - globalCheckpointTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); + replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); } changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]"); @@ -490,7 +490,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ engine.rollTranslogGeneration(); engine.fillSeqNoGaps(newPrimaryTerm); - globalCheckpointTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), + replicationTracker.updateLocalCheckpoint(currentRouting.allocationId().getId(), getEngine().getLocalCheckpointTracker().getCheckpoint()); primaryReplicaSyncer.accept(this, new ActionListener() { @Override @@ -517,7 +517,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } }, e -> failShard("exception during primary term transition", e)); - globalCheckpointTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); + replicationTracker.activatePrimaryMode(getEngine().getLocalCheckpointTracker().getCheckpoint()); primaryTerm = newPrimaryTerm; } } @@ -571,7 +571,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @throws InterruptedException if blocking operations is interrupted */ public void relocated( - final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { + final String reason, final Consumer consumer) throws IllegalIndexShardStateException, InterruptedException { assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting; try { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { @@ -583,17 +583,17 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * network operation. Doing this under the mutex can implicitly block the cluster state update thread on network operations. */ verifyRelocatingState(); - final GlobalCheckpointTracker.PrimaryContext primaryContext = globalCheckpointTracker.startRelocationHandoff(); + final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(); try { consumer.accept(primaryContext); synchronized (mutex) { verifyRelocatingState(); changeState(IndexShardState.RELOCATED, reason); } - globalCheckpointTracker.completeRelocationHandoff(); + replicationTracker.completeRelocationHandoff(); } catch (final Exception e) { try { - globalCheckpointTracker.abortRelocationHandoff(); + replicationTracker.abortRelocationHandoff(); } catch (final Exception inner) { e.addSuppressed(inner); } @@ -910,7 +910,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Nullable public SeqNoStats seqNoStats() { Engine engine = getEngineOrNull(); - return engine == null ? null : engine.getLocalCheckpointTracker().getStats(globalCheckpointTracker.getGlobalCheckpoint()); + return engine == null ? null : engine.getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint()); } public IndexingStats indexingStats(String... types) { @@ -1285,7 +1285,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final RecoveryState.Translog translogStats = recoveryState().getTranslog(); translogStats.totalOperations(0); translogStats.totalOperationsOnStart(0); - globalCheckpointTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created"); + replicationTracker.updateGlobalCheckpointOnReplica(SequenceNumbers.NO_OPS_PERFORMED, "index created"); innerOpenEngineAndTranslog(EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG, false); } @@ -1304,7 +1304,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert existingCommits.size() == 1 : "Open index create translog should have one commit, commits[" + existingCommits + "]"; } } - globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog"); + replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "opening index with a new translog"); innerOpenEngineAndTranslog(EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG, forceNewHistoryUUID); } @@ -1355,7 +1355,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl // we have to set it before we open an engine and recover from the translog because // acquiring a snapshot from the translog causes a sync which causes the global checkpoint to be pulled in, // and an engine can be forced to close in ctor which also causes the global checkpoint to be pulled in. - globalCheckpointTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()), + replicationTracker.updateGlobalCheckpointOnReplica(Translog.readGlobalCheckpoint(translogConfig.getTranslogPath()), "read from translog checkpoint"); } createNewEngine(config); @@ -1721,7 +1721,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl /** * Notifies the service to update the local checkpoint for the shard with the provided allocation ID. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#updateLocalCheckpoint(String, long)} for + * {@link ReplicationTracker#updateLocalCheckpoint(String, long)} for * details. * * @param allocationId the allocation ID of the shard to update the local checkpoint for @@ -1730,7 +1730,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) { verifyPrimary(); verifyNotClosed(); - globalCheckpointTracker.updateLocalCheckpoint(allocationId, checkpoint); + replicationTracker.updateLocalCheckpoint(allocationId, checkpoint); } /** @@ -1742,7 +1742,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { verifyPrimary(); verifyNotClosed(); - globalCheckpointTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); + replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } /** @@ -1756,19 +1756,19 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } /** - * Called when the recovery process for a shard is ready to open the engine on the target shard. - * See {@link GlobalCheckpointTracker#initiateTracking(String)} for details. + * Called when the recovery process for a shard has opened the engine on the target shard. Ensures that the right data structures + * have been set up locally to track local checkpoint information for the shard and that the shard is added to the replication group. * * @param allocationId the allocation ID of the shard for which recovery was initiated */ public void initiateTracking(final String allocationId) { verifyPrimary(); - globalCheckpointTracker.initiateTracking(allocationId); + replicationTracker.initiateTracking(allocationId); } /** * Marks the shard with the provided allocation ID as in-sync with the primary shard. See - * {@link org.elasticsearch.index.seqno.GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} + * {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} * for additional details. * * @param allocationId the allocation ID of the shard to mark as in-sync @@ -1776,7 +1776,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { verifyPrimary(); - globalCheckpointTracker.markAllocationIdAsInSync(allocationId, localCheckpoint); + replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } /** @@ -1794,7 +1794,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @return the global checkpoint */ public long getGlobalCheckpoint() { - return globalCheckpointTracker.getGlobalCheckpoint(); + return replicationTracker.getGlobalCheckpoint(); } /** @@ -1805,7 +1805,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public ObjectLongMap getInSyncGlobalCheckpoints() { verifyPrimary(); verifyNotClosed(); - return globalCheckpointTracker.getInSyncGlobalCheckpoints(); + return replicationTracker.getInSyncGlobalCheckpoints(); } /** @@ -1819,7 +1819,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return; } // only sync if there are not operations in flight - final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(globalCheckpointTracker.getGlobalCheckpoint()); + final SeqNoStats stats = getEngine().getLocalCheckpointTracker().getStats(replicationTracker.getGlobalCheckpoint()); if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) { final ObjectLongMap globalCheckpoints = getInSyncGlobalCheckpoints(); final String allocationId = routingEntry().allocationId().getId(); @@ -1845,7 +1845,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl public ReplicationGroup getReplicationGroup() { verifyPrimary(); verifyNotClosed(); - return globalCheckpointTracker.getReplicationGroup(); + return replicationTracker.getReplicationGroup(); } /** @@ -1873,7 +1873,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl "that is higher than its local checkpoint [" + localCheckpoint + "]"; return; } - globalCheckpointTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason); + replicationTracker.updateGlobalCheckpointOnReplica(globalCheckpoint, reason); } /** @@ -1881,13 +1881,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * * @param primaryContext the sequence number context */ - public void activateWithPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { + public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { verifyPrimary(); assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) && getEngine().getLocalCheckpointTracker().getCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); - globalCheckpointTracker.activateWithPrimaryContext(primaryContext); + replicationTracker.activateWithPrimaryContext(primaryContext); } /** @@ -1897,7 +1897,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ public boolean pendingInSync() { verifyPrimary(); - return globalCheckpointTracker.pendingInSync(); + return replicationTracker.pendingInSync(); } /** @@ -2191,7 +2191,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), Collections.singletonList(refreshListeners), Collections.singletonList(new RefreshMetricUpdater(refreshMetric)), - indexSort, this::runTranslogRecovery, circuitBreakerService, globalCheckpointTracker); + indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker); } /** @@ -2458,8 +2458,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } // for tests - GlobalCheckpointTracker getGlobalCheckpointTracker() { - return globalCheckpointTracker; + ReplicationTracker getReplicationTracker() { + return replicationTracker; } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java index bb90af8cdd8..4f63d746f51 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ReplicationGroup.java @@ -20,7 +20,11 @@ package org.elasticsearch.index.shard; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.common.util.set.Sets; +import java.util.ArrayList; +import java.util.List; import java.util.Set; /** @@ -29,10 +33,44 @@ import java.util.Set; public class ReplicationGroup { private final IndexShardRoutingTable routingTable; private final Set inSyncAllocationIds; + private final Set trackedAllocationIds; - public ReplicationGroup(IndexShardRoutingTable routingTable, Set inSyncAllocationIds) { + private final Set unavailableInSyncShards; // derived from the other fields + private final List replicationTargets; // derived from the other fields + private final List skippedShards; // derived from the other fields + + public ReplicationGroup(IndexShardRoutingTable routingTable, Set inSyncAllocationIds, Set trackedAllocationIds) { this.routingTable = routingTable; this.inSyncAllocationIds = inSyncAllocationIds; + this.trackedAllocationIds = trackedAllocationIds; + + this.unavailableInSyncShards = Sets.difference(inSyncAllocationIds, routingTable.getAllAllocationIds()); + this.replicationTargets = new ArrayList<>(); + this.skippedShards = new ArrayList<>(); + for (final ShardRouting shard : routingTable) { + if (shard.unassigned()) { + assert shard.primary() == false : "primary shard should not be unassigned in a replication group: " + shard; + skippedShards.add(shard); + } else { + if (trackedAllocationIds.contains(shard.allocationId().getId())) { + replicationTargets.add(shard); + } else { + assert inSyncAllocationIds.contains(shard.allocationId().getId()) == false : + "in-sync shard copy but not tracked: " + shard; + skippedShards.add(shard); + } + if (shard.relocating()) { + ShardRouting relocationTarget = shard.getTargetRelocatingShard(); + if (trackedAllocationIds.contains(relocationTarget.allocationId().getId())) { + replicationTargets.add(relocationTarget); + } else { + skippedShards.add(relocationTarget); + assert inSyncAllocationIds.contains(relocationTarget.allocationId().getId()) == false : + "in-sync shard copy but not tracked: " + shard; + } + } + } + } } public IndexShardRoutingTable getRoutingTable() { @@ -43,6 +81,29 @@ public class ReplicationGroup { return inSyncAllocationIds; } + /** + * Returns the set of shard allocation ids that are in the in-sync set but have no assigned routing entry + */ + public Set getUnavailableInSyncShards() { + return unavailableInSyncShards; + } + + /** + * Returns the subset of shards in the routing table that should be replicated to. Includes relocation targets. + */ + public List getReplicationTargets() { + return replicationTargets; + } + + /** + * Returns the subset of shards in the routing table that are unassigned or initializing and not ready yet to receive operations + * (i.e. engine not opened yet). Includes relocation targets. + */ + public List getSkippedShards() { + return skippedShards; + } + + @Override public boolean equals(Object o) { if (this == o) return true; @@ -51,13 +112,15 @@ public class ReplicationGroup { ReplicationGroup that = (ReplicationGroup) o; if (!routingTable.equals(that.routingTable)) return false; - return inSyncAllocationIds.equals(that.inSyncAllocationIds); + if (!inSyncAllocationIds.equals(that.inSyncAllocationIds)) return false; + return trackedAllocationIds.equals(that.trackedAllocationIds); } @Override public int hashCode() { int result = routingTable.hashCode(); result = 31 * result + inSyncAllocationIds.hashCode(); + result = 31 * result + trackedAllocationIds.hashCode(); return result; } @@ -66,6 +129,7 @@ public class ReplicationGroup { return "ReplicationGroup{" + "routingTable=" + routingTable + ", inSyncAllocationIds=" + inSyncAllocationIds + + ", trackedAllocationIds=" + trackedAllocationIds + '}'; } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 5aa8b5f3ee1..d17740ed600 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -55,7 +55,7 @@ import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardRelocatedException; @@ -742,7 +742,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple * - Updates and persists the new routing value. * - Updates the primary term if this shard is a primary. * - Updates the allocation ids that are tracked by the shard if it is a primary. - * See {@link GlobalCheckpointTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details. + * See {@link ReplicationTracker#updateFromMaster(long, Set, IndexShardRoutingTable, Set)} for details. * * @param shardRouting the new routing entry * @param primaryTerm the new primary term diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java index 3a472c447b2..a4a87cf2d60 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryHandoffPrimaryContextRequest.java @@ -21,7 +21,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.TransportRequest; @@ -34,7 +34,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { private long recoveryId; private ShardId shardId; - private GlobalCheckpointTracker.PrimaryContext primaryContext; + private ReplicationTracker.PrimaryContext primaryContext; /** * Initialize an empty request (used to serialize into when reading from a stream). @@ -50,7 +50,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { * @param primaryContext the primary context */ RecoveryHandoffPrimaryContextRequest(final long recoveryId, final ShardId shardId, - final GlobalCheckpointTracker.PrimaryContext primaryContext) { + final ReplicationTracker.PrimaryContext primaryContext) { this.recoveryId = recoveryId; this.shardId = shardId; this.primaryContext = primaryContext; @@ -64,7 +64,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { return shardId; } - GlobalCheckpointTracker.PrimaryContext primaryContext() { + ReplicationTracker.PrimaryContext primaryContext() { return primaryContext; } @@ -73,7 +73,7 @@ class RecoveryHandoffPrimaryContextRequest extends TransportRequest { super.readFrom(in); recoveryId = in.readLong(); shardId = ShardId.readShardId(in); - primaryContext = new GlobalCheckpointTracker.PrimaryContext(in); + primaryContext = new ReplicationTracker.PrimaryContext(in); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5a0ee1cf44d..47c3d073f10 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -185,8 +185,6 @@ public class RecoverySourceHandler { assert requiredSeqNoRangeStart >= startingSeqNo : "requiredSeqNoRangeStart [" + requiredSeqNoRangeStart + "] is lower than [" + startingSeqNo + "]"; - runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); - try { // For a sequence based recovery, the target can keep its local translog prepareTargetForTranslog(isSequenceNumberBasedRecovery == false, translog.estimateTotalOperationsFromMinSeq(startingSeqNo)); @@ -194,6 +192,14 @@ public class RecoverySourceHandler { throw new RecoveryEngineException(shard.shardId(), 1, "prepare target for translog failed", e); } + /* + * add shard to replication group (shard will receive replication requests from this point on) now that engine is open. + * This means that any document indexed into the primary after this will be replicated to this replica as well + * make sure to do this before sampling the max sequence number in the next step, to ensure that we send + * all documents up to maxSeqNo in phase2. + */ + runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId())); + final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); /* * We need to wait for all operations up to the current max to complete, otherwise we can not guarantee that all diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index f4c823c0e96..75afaa140f2 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -41,7 +41,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardNotRecoveringException; @@ -387,7 +387,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } @Override - public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { + public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { indexShard.activateWithPrimaryContext(primaryContext); } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java index 736d6020446..9cedfa8039a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java @@ -19,7 +19,7 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.translog.Translog; @@ -55,7 +55,7 @@ public interface RecoveryTargetHandler { * * @param primaryContext the primary context from the relocation source */ - void handoffPrimaryContext(GlobalCheckpointTracker.PrimaryContext primaryContext); + void handoffPrimaryContext(ReplicationTracker.PrimaryContext primaryContext); /** * Index a set of translog operations on the target diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 4ea2be0e726..966ed426d48 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -23,7 +23,7 @@ import org.apache.lucene.store.RateLimiter; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetaData; @@ -100,7 +100,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { } @Override - public void handoffPrimaryContext(final GlobalCheckpointTracker.PrimaryContext primaryContext) { + public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { transportService.submitRequest( targetNode, PeerRecoveryTargetService.Actions.HANDOFF_PRIMARY_CONTEXT, diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 07dd1ae9ed1..858cbcce199 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; @@ -69,23 +70,30 @@ public class ReplicationOperationTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); - IndexMetaData indexMetaData = state.getMetaData().index(index); + ClusterState initialState = stateWithActivePrimary(index, true, randomInt(5)); + IndexMetaData indexMetaData = initialState.getMetaData().index(index); final long primaryTerm = indexMetaData.primaryTerm(0); - final IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().shardRoutingTable(shardId); + final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); if (primaryShard.relocating() && randomBoolean()) { // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated - state = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + initialState = ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); primaryShard = primaryShard.getTargetRelocatingShard(); } // add a few in-sync allocation ids that don't have corresponding routing entries - Set staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false)); - state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData) - .putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds)))).build(); + final Set staleAllocationIds = Sets.newHashSet(generateRandomStringArray(4, 10, false)); - final Set expectedReplicas = getExpectedReplicas(shardId, state); + final Set inSyncAllocationIds = Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds); + + final Set trackedShards = new HashSet<>(); + final Set untrackedShards = new HashSet<>(); + addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, untrackedShards); + trackedShards.addAll(staleAllocationIds); + + final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards); + + final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); final Map expectedFailures = new HashMap<>(); final Set expectedFailedShards = new HashSet<>(); @@ -109,8 +117,8 @@ public class ReplicationOperationTests extends ESTestCase { Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures); - final ClusterState finalState = state; - final TestPrimary primary = new TestPrimary(primaryShard, () -> finalState); + + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup); final TestReplicationOperation op = new TestReplicationOperation(request, primary, listener, replicasProxy); op.execute(); @@ -126,42 +134,73 @@ public class ReplicationOperationTests extends ESTestCase { assertThat(shardInfo.getSuccessful(), equalTo(1 + expectedReplicas.size() - expectedFailures.size())); final List unassignedShards = indexShardRoutingTable.shardsWithState(ShardRoutingState.UNASSIGNED); - final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size(); - assertThat(shardInfo.getTotal(), equalTo(totalShards)); + final int totalShards = 1 + expectedReplicas.size() + unassignedShards.size() + untrackedShards.size(); + assertThat(replicationGroup.toString(), shardInfo.getTotal(), equalTo(totalShards)); assertThat(primary.knownLocalCheckpoints.remove(primaryShard.allocationId().getId()), equalTo(primary.localCheckpoint)); assertThat(primary.knownLocalCheckpoints, equalTo(replicasProxy.generatedLocalCheckpoints)); assertThat(primary.knownGlobalCheckpoints, equalTo(replicasProxy.generatedGlobalCheckpoints)); } + private void addTrackingInfo(IndexShardRoutingTable indexShardRoutingTable, ShardRouting primaryShard, Set trackedShards, + Set untrackedShards) { + for (ShardRouting shr : indexShardRoutingTable.shards()) { + if (shr.unassigned() == false) { + if (shr.initializing()) { + if (randomBoolean()) { + trackedShards.add(shr.allocationId().getId()); + } else { + untrackedShards.add(shr.allocationId().getId()); + } + } else { + trackedShards.add(shr.allocationId().getId()); + if (shr.relocating()) { + if (primaryShard == shr.getTargetRelocatingShard() || randomBoolean()) { + trackedShards.add(shr.getTargetRelocatingShard().allocationId().getId()); + } else { + untrackedShards.add(shr.getTargetRelocatingShard().allocationId().getId()); + } + } + } + } + } + } + public void testDemotedPrimary() throws Exception { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - ClusterState state = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2)); - IndexMetaData indexMetaData = state.getMetaData().index(index); + ClusterState initialState = stateWithActivePrimary(index, true, 1 + randomInt(2), randomInt(2)); + IndexMetaData indexMetaData = initialState.getMetaData().index(index); final long primaryTerm = indexMetaData.primaryTerm(0); - ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + final IndexShardRoutingTable indexShardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); + ShardRouting primaryShard = indexShardRoutingTable.primaryShard(); if (primaryShard.relocating() && randomBoolean()) { // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated - state = ClusterState.builder(state) - .nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + initialState = ClusterState.builder(initialState) + .nodes(DiscoveryNodes.builder(initialState.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); primaryShard = primaryShard.getTargetRelocatingShard(); } - // add in-sync allocation id that doesn't have a corresponding routing entry - state = ClusterState.builder(state).metaData(MetaData.builder(state.metaData()).put(IndexMetaData.builder(indexMetaData) - .putInSyncAllocationIds(0, Sets.union(indexMetaData.inSyncAllocationIds(0), Sets.newHashSet(randomAlphaOfLength(10)))))) - .build(); + // add an in-sync allocation id that doesn't have a corresponding routing entry + final Set staleAllocationIds = Sets.newHashSet(randomAlphaOfLength(10)); + final Set inSyncAllocationIds = Sets.union(indexMetaData.inSyncAllocationIds(0), staleAllocationIds); + final Set trackedShards = new HashSet<>(); + addTrackingInfo(indexShardRoutingTable, primaryShard, trackedShards, new HashSet<>()); + trackedShards.addAll(staleAllocationIds); - final Set expectedReplicas = getExpectedReplicas(shardId, state); + final ReplicationGroup replicationGroup = new ReplicationGroup(indexShardRoutingTable, inSyncAllocationIds, trackedShards); + + final Set expectedReplicas = getExpectedReplicas(shardId, initialState, trackedShards); final Map expectedFailures = new HashMap<>(); + if (expectedReplicas.isEmpty()) { + return; + } final ShardRouting failedReplica = randomFrom(new ArrayList<>(expectedReplicas)); expectedFailures.put(failedReplica, new CorruptIndexException("simulated", (String) null)); Request request = new Request(shardId); PlainActionFuture listener = new PlainActionFuture<>(); - final ClusterState finalState = state; final boolean testPrimaryDemotedOnStaleShardCopies = randomBoolean(); final TestReplicaProxy replicasProxy = new TestReplicaProxy(primaryTerm, expectedFailures) { @Override @@ -187,7 +226,7 @@ public class ReplicationOperationTests extends ESTestCase { } }; AtomicBoolean primaryFailed = new AtomicBoolean(); - final TestPrimary primary = new TestPrimary(primaryShard, () -> finalState) { + final TestPrimary primary = new TestPrimary(primaryShard, () -> replicationGroup) { @Override public void failShard(String message, Exception exception) { assertTrue(primaryFailed.compareAndSet(false, true)); @@ -207,6 +246,12 @@ public class ReplicationOperationTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); final ClusterState initialState = stateWithActivePrimary(index, true, 0); + Set inSyncAllocationIds = initialState.metaData().index(index).inSyncAllocationIds(0); + IndexShardRoutingTable shardRoutingTable = initialState.getRoutingTable().shardRoutingTable(shardId); + Set trackedShards = new HashSet<>(); + addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); + ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + final ClusterState stateWithAddedReplicas; if (randomBoolean()) { stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, @@ -214,16 +259,24 @@ public class ReplicationOperationTests extends ESTestCase { } else { stateWithAddedReplicas = state(index, true, ShardRoutingState.RELOCATING); } - AtomicReference state = new AtomicReference<>(initialState); - logger.debug("--> using initial state:\n{}", state.get()); + + inSyncAllocationIds = stateWithAddedReplicas.metaData().index(index).inSyncAllocationIds(0); + shardRoutingTable = stateWithAddedReplicas.getRoutingTable().shardRoutingTable(shardId); + trackedShards = new HashSet<>(); + addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); + + ReplicationGroup updatedReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + + final AtomicReference replicationGroup = new AtomicReference<>(initialReplicationGroup); + logger.debug("--> using initial replicationGroup:\n{}", replicationGroup.get()); final long primaryTerm = initialState.getMetaData().index(shardId.getIndexName()).primaryTerm(shardId.id()); - final ShardRouting primaryShard = state.get().routingTable().shardRoutingTable(shardId).primaryShard(); - final TestPrimary primary = new TestPrimary(primaryShard, state::get) { + final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard(); + final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) { @Override public Result perform(Request request) throws Exception { Result result = super.perform(request); - state.set(stateWithAddedReplicas); - logger.debug("--> state after primary operation:\n{}", state.get()); + replicationGroup.set(updatedReplicationGroup); + logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); return result; } }; @@ -235,7 +288,7 @@ public class ReplicationOperationTests extends ESTestCase { op.execute(); assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - Set expectedReplicas = getExpectedReplicas(shardId, state.get()); + Set expectedReplicas = getExpectedReplicas(shardId, stateWithAddedReplicas, trackedShards); assertThat(request.processedOnReplicas, equalTo(expectedReplicas)); } @@ -265,10 +318,16 @@ public class ReplicationOperationTests extends ESTestCase { passesActiveShardCheck ? "succeed" : "retry", state); final long primaryTerm = state.metaData().index(index).primaryTerm(shardId.id()); final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); + + final Set inSyncAllocationIds = state.metaData().index(index).inSyncAllocationIds(0); + Set trackedShards = new HashSet<>(); + addTrackingInfo(shardRoutingTable, null, trackedShards, new HashSet<>()); + final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + PlainActionFuture listener = new PlainActionFuture<>(); final ShardRouting primaryShard = shardRoutingTable.primaryShard(); final TestReplicationOperation op = new TestReplicationOperation(request, - new TestPrimary(primaryShard, () -> state), + new TestPrimary(primaryShard, () -> initialReplicationGroup), listener, new TestReplicaProxy(primaryTerm), logger, "test"); if (passesActiveShardCheck) { @@ -296,10 +355,15 @@ public class ReplicationOperationTests extends ESTestCase { final long primaryTerm = indexMetaData.primaryTerm(0); final ShardRouting primaryRouting = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + final Set inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0); + final IndexShardRoutingTable shardRoutingTable = state.routingTable().index(index).shard(shardId.id()); + final Set trackedShards = shardRoutingTable.getAllAllocationIds(); + final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards); + final boolean fatal = randomBoolean(); final AtomicBoolean primaryFailed = new AtomicBoolean(); final ReplicationOperation.Primary primary = - new TestPrimary(primaryRouting, () -> state) { + new TestPrimary(primaryRouting, () -> initialReplicationGroup) { @Override public void failShard(String message, Exception exception) { @@ -330,10 +394,10 @@ public class ReplicationOperationTests extends ESTestCase { final ShardInfo shardInfo = listener.actionGet().getShardInfo(); assertThat(shardInfo.getFailed(), equalTo(0)); assertThat(shardInfo.getFailures(), arrayWithSize(0)); - assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state).size())); + assertThat(shardInfo.getSuccessful(), equalTo(1 + getExpectedReplicas(shardId, state, trackedShards).size())); } - private Set getExpectedReplicas(ShardId shardId, ClusterState state) { + private Set getExpectedReplicas(ShardId shardId, ClusterState state, Set trackedShards) { Set expectedReplicas = new HashSet<>(); String localNodeId = state.nodes().getLocalNodeId(); if (state.routingTable().hasIndex(shardId.getIndexName())) { @@ -342,11 +406,15 @@ public class ReplicationOperationTests extends ESTestCase { continue; } if (localNodeId.equals(shardRouting.currentNodeId()) == false) { - expectedReplicas.add(shardRouting); + if (trackedShards.contains(shardRouting.allocationId().getId())) { + expectedReplicas.add(shardRouting); + } } if (shardRouting.relocating() && localNodeId.equals(shardRouting.relocatingNodeId()) == false) { - expectedReplicas.add(shardRouting.getTargetRelocatingShard()); + if (trackedShards.contains(shardRouting.getTargetRelocatingShard().allocationId().getId())) { + expectedReplicas.add(shardRouting.getTargetRelocatingShard()); + } } } } @@ -379,13 +447,13 @@ public class ReplicationOperationTests extends ESTestCase { final ShardRouting routing; final long localCheckpoint; final long globalCheckpoint; - final Supplier clusterStateSupplier; + final Supplier replicationGroupSupplier; final Map knownLocalCheckpoints = new HashMap<>(); final Map knownGlobalCheckpoints = new HashMap<>(); - TestPrimary(ShardRouting routing, Supplier clusterStateSupplier) { + TestPrimary(ShardRouting routing, Supplier replicationGroupSupplier) { this.routing = routing; - this.clusterStateSupplier = clusterStateSupplier; + this.replicationGroupSupplier = replicationGroupSupplier; this.localCheckpoint = random().nextLong(); this.globalCheckpoint = randomNonNegativeLong(); } @@ -453,9 +521,7 @@ public class ReplicationOperationTests extends ESTestCase { @Override public ReplicationGroup getReplicationGroup() { - ClusterState clusterState = clusterStateSupplier.get(); - return new ReplicationGroup(clusterState.routingTable().shardRoutingTable(routing.shardId()), - clusterState.metaData().index(routing.index()).inSyncAllocationIds(routing.id())); + return replicationGroupSupplier.get(); } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 9356fd12a3a..2112a231d37 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -95,6 +95,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -683,9 +684,13 @@ public class TransportReplicationActionTests extends ESTestCase { final IndexShard shard = mock(IndexShard.class); when(shard.getPrimaryTerm()).thenReturn(primaryTerm); when(shard.routingEntry()).thenReturn(routingEntry); + IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId); + Set inSyncIds = randomBoolean() ? Collections.singleton(routingEntry.allocationId().getId()) : + clusterService.state().metaData().index(index).inSyncAllocationIds(0); when(shard.getReplicationGroup()).thenReturn( - new ReplicationGroup(clusterService.state().routingTable().shardRoutingTable(shardId), - clusterService.state().metaData().index(index).inSyncAllocationIds(0))); + new ReplicationGroup(shardRoutingTable, + inSyncIds, + shardRoutingTable.getAllAllocationIds())); doAnswer(invocation -> { ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); return null; diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 74e94406142..9b6daba22ea 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -109,7 +109,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.RootObjectMapper; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; @@ -1968,7 +1968,7 @@ public class InternalEngineTests extends EngineTestCase { final ShardRouting primary = TestShardRouting.newShardRouting("test", shardId.id(), "node1", null, true, ShardRoutingState.STARTED, allocationId); final ShardRouting replica = TestShardRouting.newShardRouting(shardId, "node2", false, ShardRoutingState.STARTED); - GlobalCheckpointTracker gcpTracker = (GlobalCheckpointTracker) initialEngine.config().getGlobalCheckpointSupplier(); + ReplicationTracker gcpTracker = (ReplicationTracker) initialEngine.config().getGlobalCheckpointSupplier(); gcpTracker.updateFromMaster(1L, new HashSet<>(Arrays.asList(primary.allocationId().getId(), replica.allocationId().getId())), new IndexShardRoutingTable.Builder(shardId).addShard(primary).addShard(replica).build(), Collections.emptySet()); diff --git a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a091cd44c4a..a67e843e468 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/server/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -499,8 +499,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override public org.elasticsearch.index.shard.ReplicationGroup getReplicationGroup() { - return new org.elasticsearch.index.shard.ReplicationGroup(replicationGroup.routingTable(Function.identity()), - replicationGroup.activeIds()); + return replicationGroup.primary.getReplicationGroup(); } } diff --git a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java index cd948ed9f90..4fb5a3e82c6 100644 --- a/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/index/replication/RecoveryDuringReplicationTests.java @@ -247,7 +247,7 @@ public class RecoveryDuringReplicationTests extends ESIndexLevelReplicationTestC // check that local checkpoint of new primary is properly tracked after primary promotion assertThat(newPrimary.getLocalCheckpoint(), equalTo(totalDocs - 1L)); - assertThat(IndexShardTestCase.getGlobalCheckpointTracker(newPrimary) + assertThat(IndexShardTestCase.getReplicationTracker(newPrimary) .getTrackedLocalCheckpointForShard(newPrimary.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(totalDocs - 1L)); diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java similarity index 91% rename from server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java rename to server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index dcaab38be5c..d89e4289e1a 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -59,10 +59,10 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; -public class GlobalCheckpointTrackerTests extends ESTestCase { +public class ReplicationTrackerTests extends ESTestCase { public void testEmptyShards() { - final GlobalCheckpointTracker tracker = newTracker(AllocationId.newInitializing()); + final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); } @@ -116,7 +116,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AllocationId primaryId = active.iterator().next(); - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); logger.info("--> using allocations"); @@ -134,7 +134,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); - initializing.forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1)); + initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size())); allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint)); @@ -164,9 +166,9 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { // now notify for the new id if (randomBoolean()) { tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); - markAllocationIdAsInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates)); + markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates)); } else { - markAllocationIdAsInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); } // now it should be incremented @@ -180,10 +182,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { assigned.putAll(active); assigned.putAll(initializing); AllocationId primaryId = active.keySet().iterator().next(); - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); - randomSubsetOf(initializing.keySet()).forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); + randomSubsetOf(initializing.keySet()).forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); final AllocationId missingActiveID = randomFrom(active.keySet()); assigned .entrySet() @@ -205,11 +207,11 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { logger.info("active: {}, initializing: {}", active, initializing); AllocationId primaryId = active.keySet().iterator().next(); - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); randomSubsetOf(randomIntBetween(1, initializing.size() - 1), - initializing.keySet()).forEach(aId -> markAllocationIdAsInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); + initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); @@ -225,12 +227,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Map initializing = randomAllocationsWithLocalCheckpoints(1, 5); final Map nonApproved = randomAllocationsWithLocalCheckpoints(1, 5); final AllocationId primaryId = active.keySet().iterator().next(); - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(randomNonNegativeLong(), ids(active.keySet()), routingTable(initializing.keySet(), primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); - initializing.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); + initializing.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); nonApproved.keySet().forEach(k -> - expectThrows(IllegalStateException.class, () -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED))); + expectThrows(IllegalStateException.class, () -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED))); List> allocations = Arrays.asList(active, initializing, nonApproved); Collections.shuffle(allocations, random()); @@ -260,13 +262,13 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { if (randomBoolean()) { allocations.putAll(initializingToBeRemoved); } - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); if (randomBoolean()) { - initializingToStay.keySet().forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); + initializingToStay.keySet().forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } else { - initializing.forEach(k -> markAllocationIdAsInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); + initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } if (randomBoolean()) { allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); @@ -302,7 +304,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AtomicBoolean complete = new AtomicBoolean(); final AllocationId inSyncAllocationId = AllocationId.newInitializing(); final AllocationId trackingAllocationId = AllocationId.newInitializing(); - final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId); + final ReplicationTracker tracker = newTracker(inSyncAllocationId); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet()); tracker.activatePrimaryMode(globalCheckpoint); @@ -310,6 +312,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { try { // synchronize starting with the test thread barrier.await(); + tracker.initiateTracking(trackingAllocationId.getId()); tracker.markAllocationIdAsInSync(trackingAllocationId.getId(), localCheckpoint); complete.set(true); // synchronize with the test thread checking if we are no longer waiting @@ -343,8 +346,8 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { thread.join(); } - private GlobalCheckpointTracker newTracker(final AllocationId allocationId) { - return new GlobalCheckpointTracker( + private ReplicationTracker newTracker(final AllocationId allocationId) { + return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), @@ -358,7 +361,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final AtomicBoolean interrupted = new AtomicBoolean(); final AllocationId inSyncAllocationId = AllocationId.newInitializing(); final AllocationId trackingAllocationId = AllocationId.newInitializing(); - final GlobalCheckpointTracker tracker = newTracker(inSyncAllocationId); + final ReplicationTracker tracker = newTracker(inSyncAllocationId); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()), routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet()); tracker.activatePrimaryMode(globalCheckpoint); @@ -370,6 +373,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { throw new RuntimeException(e); } try { + tracker.initiateTracking(trackingAllocationId.getId()); tracker.markAllocationIdAsInSync(trackingAllocationId.getId(), localCheckpoint); } catch (final InterruptedException e) { interrupted.set(true); @@ -407,7 +411,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Set initializingIds = activeAndInitializingAllocationIds.v2(); AllocationId primaryId = activeAllocationIds.iterator().next(); IndexShardRoutingTable routingTable = routingTable(initializingIds, primaryId); - final GlobalCheckpointTracker tracker = newTracker(primaryId); + final ReplicationTracker tracker = newTracker(primaryId); tracker.updateFromMaster(initialClusterStateVersion, ids(activeAllocationIds), routingTable, emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); assertThat(tracker.getReplicationGroup().getInSyncAllocationIds(), equalTo(ids(activeAllocationIds))); @@ -508,6 +512,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Thread thread = new Thread(() -> { try { barrier.await(); + tracker.initiateTracking(newSyncingAllocationId.getId()); tracker.markAllocationIdAsInSync(newSyncingAllocationId.getId(), localCheckpoint); barrier.await(); } catch (final BrokenBarrierException | InterruptedException e) { @@ -547,13 +552,13 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { } /** - * If we do not update the global checkpoint in {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} after adding the + * If we do not update the global checkpoint in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} after adding the * allocation ID to the in-sync set and removing it from pending, the local checkpoint update that freed the thread waiting for the * local checkpoint to advance could miss updating the global checkpoint in a race if the waiting thread did not add the allocation * ID to the in-sync set and remove it from the pending set before the local checkpoint updating thread executed the global checkpoint - * update. This test fails without an additional call to {@link GlobalCheckpointTracker#updateGlobalCheckpointOnPrimary()} after - * removing the allocation ID from the pending set in {@link GlobalCheckpointTracker#markAllocationIdAsInSync(String, long)} (even if a - * call is added after notifying all waiters in {@link GlobalCheckpointTracker#updateLocalCheckpoint(String, long)}). + * update. This test fails without an additional call to {@link ReplicationTracker#updateGlobalCheckpointOnPrimary()} after + * removing the allocation ID from the pending set in {@link ReplicationTracker#markAllocationIdAsInSync(String, long)} (even if a + * call is added after notifying all waiters in {@link ReplicationTracker#updateLocalCheckpoint(String, long)}). * * @throws InterruptedException if the main test thread was interrupted while waiting * @throws BrokenBarrierException if the barrier was broken while the main test thread was waiting @@ -565,7 +570,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final CyclicBarrier barrier = new CyclicBarrier(4); final int activeLocalCheckpoint = randomIntBetween(0, Integer.MAX_VALUE - 1); - final GlobalCheckpointTracker tracker = newTracker(active); + final ReplicationTracker tracker = newTracker(active); tracker.updateFromMaster( randomNonNegativeLong(), Collections.singleton(active.getId()), @@ -595,6 +600,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { final Thread markingThread = new Thread(() -> { try { barrier.await(); + tracker.initiateTracking(initializing.getId()); tracker.markAllocationIdAsInSync(initializing.getId(), initializingLocalCheckpoint - 1); } catch (final BrokenBarrierException | InterruptedException e) { throw new RuntimeException(e); @@ -619,10 +625,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { FakeClusterState clusterState = initialState(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); - GlobalCheckpointTracker oldPrimary = - new GlobalCheckpointTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO); - GlobalCheckpointTracker newPrimary = - new GlobalCheckpointTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO); + ReplicationTracker oldPrimary = + new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO); + ReplicationTracker newPrimary = + new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); @@ -647,12 +653,12 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { } // simulate transferring the global checkpoint to the new primary after finalizing recovery before the handoff - markAllocationIdAsInSyncQuietly( + markAsTrackingAndInSyncQuietly( oldPrimary, newPrimary.shardAllocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, oldPrimary.getGlobalCheckpoint() + randomInt(5))); oldPrimary.updateGlobalCheckpointForShard(newPrimary.shardAllocationId, oldPrimary.getGlobalCheckpoint()); - GlobalCheckpointTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(); + ReplicationTracker.PrimaryContext primaryContext = oldPrimary.startRelocationHandoff(); if (randomBoolean()) { // cluster state update after primary context handoff @@ -685,7 +691,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { BytesStreamOutput output = new BytesStreamOutput(); primaryContext.writeTo(output); StreamInput streamInput = output.bytes().streamInput(); - primaryContext = new GlobalCheckpointTracker.PrimaryContext(streamInput); + primaryContext = new ReplicationTracker.PrimaryContext(streamInput); switch (randomInt(3)) { case 0: { // apply cluster state update on old primary while primary context is being transferred @@ -730,10 +736,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { * will update its global checkpoint state without the old primary learning of it, and the old primary could have updated its * global checkpoint state after the primary context was transferred. */ - Map oldPrimaryCheckpointsCopy = new HashMap<>(oldPrimary.checkpoints); + Map oldPrimaryCheckpointsCopy = new HashMap<>(oldPrimary.checkpoints); oldPrimaryCheckpointsCopy.remove(oldPrimary.shardAllocationId); oldPrimaryCheckpointsCopy.remove(newPrimary.shardAllocationId); - Map newPrimaryCheckpointsCopy = new HashMap<>(newPrimary.checkpoints); + Map newPrimaryCheckpointsCopy = new HashMap<>(newPrimary.checkpoints); newPrimaryCheckpointsCopy.remove(oldPrimary.shardAllocationId); newPrimaryCheckpointsCopy.remove(newPrimary.shardAllocationId); assertThat(newPrimaryCheckpointsCopy, equalTo(oldPrimaryCheckpointsCopy)); @@ -761,7 +767,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { public void testIllegalStateExceptionIfUnknownAllocationId() { final AllocationId active = AllocationId.newInitializing(); final AllocationId initializing = AllocationId.newInitializing(); - final GlobalCheckpointTracker tracker = newTracker(active); + final ReplicationTracker tracker = newTracker(active); tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(active.getId()), routingTable(Collections.singleton(initializing), active), emptySet()); tracker.activatePrimaryMode(NO_OPS_PERFORMED); @@ -790,7 +796,7 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { .map(ShardRouting::allocationId).collect(Collectors.toSet()); } - public void apply(GlobalCheckpointTracker gcp) { + public void apply(ReplicationTracker gcp) { gcp.updateFromMaster(version, ids(inSyncIds), routingTable, Collections.emptySet()); } } @@ -818,20 +824,20 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { routingTable(initializingAllocationIds, primaryShard)); } - private static void activatePrimary(GlobalCheckpointTracker gcp) { + private static void activatePrimary(ReplicationTracker gcp) { gcp.activatePrimaryMode(randomIntBetween(Math.toIntExact(NO_OPS_PERFORMED), 10)); } - private static void randomLocalCheckpointUpdate(GlobalCheckpointTracker gcp) { + private static void randomLocalCheckpointUpdate(ReplicationTracker gcp) { String allocationId = randomFrom(gcp.checkpoints.keySet()); long currentLocalCheckpoint = gcp.checkpoints.get(allocationId).getLocalCheckpoint(); gcp.updateLocalCheckpoint(allocationId, Math.max(SequenceNumbers.NO_OPS_PERFORMED, currentLocalCheckpoint + randomInt(5))); } - private static void randomMarkInSync(GlobalCheckpointTracker gcp) { + private static void randomMarkInSync(ReplicationTracker gcp) { String allocationId = randomFrom(gcp.checkpoints.keySet()); long newLocalCheckpoint = Math.max(NO_OPS_PERFORMED, gcp.getGlobalCheckpoint() + randomInt(5)); - markAllocationIdAsInSyncQuietly(gcp, allocationId, newLocalCheckpoint); + markAsTrackingAndInSyncQuietly(gcp, allocationId, newLocalCheckpoint); } private static FakeClusterState randomUpdateClusterState(Set allocationIds, FakeClusterState clusterState) { @@ -876,9 +882,10 @@ public class GlobalCheckpointTrackerTests extends ESTestCase { }).collect(Collectors.toSet()); } - private static void markAllocationIdAsInSyncQuietly( - final GlobalCheckpointTracker tracker, final String allocationId, final long localCheckpoint) { + private static void markAsTrackingAndInSyncQuietly( + final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { try { + tracker.initiateTracking(allocationId); tracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } catch (final InterruptedException e) { throw new RuntimeException(e); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index cd75c7a08fb..9d854331b90 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -419,7 +419,7 @@ public class IndexShardTests extends IndexShardTestCase { * This test makes sure that people can use the shard routing entry to check whether a shard was already promoted to * a primary. Concretely this means, that when we publish the routing entry via {@link IndexShard#routingEntry()} the following * should have happened - * 1) Internal state (ala GlobalCheckpointTracker) have been updated + * 1) Internal state (ala ReplicationTracker) have been updated * 2) Primary term is set to the new term */ public void testPublishingOrderOnPromotion() throws IOException, BrokenBarrierException, InterruptedException { @@ -1583,7 +1583,7 @@ public class IndexShardTests extends IndexShardTestCase { IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted()); // check that local checkpoint of new primary is properly tracked after recovery assertThat(newShard.getLocalCheckpoint(), equalTo(totalOps - 1L)); - assertThat(newShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId()) + assertThat(newShard.getReplicationTracker().getTrackedLocalCheckpointForShard(newShard.routingEntry().allocationId().getId()) .getLocalCheckpoint(), equalTo(totalOps - 1L)); assertDocCount(newShard, totalOps); closeShards(newShard); @@ -1602,7 +1602,7 @@ public class IndexShardTests extends IndexShardTestCase { // check that local checkpoint of new primary is properly tracked after primary relocation assertThat(primaryTarget.getLocalCheckpoint(), equalTo(totalOps - 1L)); - assertThat(primaryTarget.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard( + assertThat(primaryTarget.getReplicationTracker().getTrackedLocalCheckpointForShard( primaryTarget.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(totalOps - 1L)); assertDocCount(primaryTarget, totalOps); closeShards(primarySource, primaryTarget); @@ -1813,9 +1813,9 @@ public class IndexShardTests extends IndexShardTestCase { })); assertThat(target.getLocalCheckpoint(), equalTo(0L)); assertThat(target.seqNoStats().getMaxSeqNo(), equalTo(0L)); - assertThat(target.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(0L)); + assertThat(target.getReplicationTracker().getGlobalCheckpoint(), equalTo(0L)); IndexShardTestCase.updateRoutingEntry(target, routing.moveToStarted()); - assertThat(target.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard( + assertThat(target.getReplicationTracker().getTrackedLocalCheckpointForShard( target.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(0L)); assertDocs(target, "0"); @@ -2259,9 +2259,9 @@ public class IndexShardTests extends IndexShardTestCase { } // check that local checkpoint of new primary is properly tracked after recovery assertThat(targetShard.getLocalCheckpoint(), equalTo(1L)); - assertThat(targetShard.getGlobalCheckpointTracker().getGlobalCheckpoint(), equalTo(1L)); + assertThat(targetShard.getReplicationTracker().getGlobalCheckpoint(), equalTo(1L)); IndexShardTestCase.updateRoutingEntry(targetShard, ShardRoutingHelper.moveToStarted(targetShard.routingEntry())); - assertThat(targetShard.getGlobalCheckpointTracker().getTrackedLocalCheckpointForShard( + assertThat(targetShard.getReplicationTracker().getTrackedLocalCheckpointForShard( targetShard.routingEntry().allocationId().getId()).getLocalCheckpoint(), equalTo(1L)); assertDocCount(targetShard, 2); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 6024daf45cc..166d3692819 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -48,7 +48,6 @@ import org.elasticsearch.index.mapper.IdFieldMapper; import org.elasticsearch.index.mapper.ParseContext.Document; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.Store; diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index 2b25ef160b4..5481a486185 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -60,7 +60,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.LocalCheckpointTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ShardId; @@ -425,7 +425,7 @@ public abstract class EngineTestCase extends ESTestCase { TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? - new GlobalCheckpointTracker(shardId, allocationId.getId(), indexSettings, + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO) : globalCheckpointSupplier); return config; } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 1ce28e16d57..265fdc56501 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -60,7 +60,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.Uid; -import org.elasticsearch.index.seqno.GlobalCheckpointTracker; +import org.elasticsearch.index.seqno.ReplicationTracker; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -640,7 +640,7 @@ public abstract class IndexShardTestCase extends ESTestCase { return indexShard.getEngine(); } - public static GlobalCheckpointTracker getGlobalCheckpointTracker(IndexShard indexShard) { - return indexShard.getGlobalCheckpointTracker(); + public static ReplicationTracker getReplicationTracker(IndexShard indexShard) { + return indexShard.getReplicationTracker(); } }