From dcc816427ef62a60690d80ff14f4caec4b471c72 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 7 Aug 2018 15:10:09 -0400 Subject: [PATCH] Expose whether or not the global checkpoint updated (#32659) It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary. --- .../index/seqno/ReplicationTracker.java | 24 +++- .../elasticsearch/index/shard/IndexShard.java | 5 +- .../index/seqno/ReplicationTrackerTests.java | 114 +++++++++++++++--- .../index/engine/EngineTestCase.java | 2 +- 4 files changed, 117 insertions(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java index e868da5e82a..b406621e978 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java @@ -39,6 +39,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.function.Function; @@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L */ final Map checkpoints; + /** + * A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on + * the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances + * due to an update from the primary. + */ + private final LongConsumer onGlobalCheckpointUpdated; + /** * This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the * current global checkpoint. @@ -391,7 +399,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final ShardId shardId, final String allocationId, final IndexSettings indexSettings, - final long globalCheckpoint) { + final long globalCheckpoint, + final LongConsumer onGlobalCheckpointUpdated) { super(shardId, indexSettings); assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint; this.shardAllocationId = allocationId; @@ -400,6 +409,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L this.appliedClusterStateVersion = -1L; this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas()); checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false)); + this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated); this.pendingInSync = new HashSet<>(); this.routingTable = null; this.replicationGroup = null; @@ -456,7 +466,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L updateGlobalCheckpoint( shardAllocationId, globalCheckpoint, - current -> logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason)); + current -> { + logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason); + onGlobalCheckpointUpdated.accept(globalCheckpoint); + }); assert invariant(); } @@ -474,7 +487,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L allocationId, globalCheckpoint, current -> logger.trace( - "updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", + "updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]", allocationId, current, globalCheckpoint)); @@ -485,8 +498,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L final CheckpointState cps = checkpoints.get(allocationId); assert !this.shardAllocationId.equals(allocationId) || cps != null; if (cps != null && globalCheckpoint > cps.globalCheckpoint) { - ifUpdated.accept(cps.globalCheckpoint); cps.globalCheckpoint = globalCheckpoint; + ifUpdated.accept(cps.globalCheckpoint); } } @@ -737,8 +750,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint + "] is lower than previous one [" + globalCheckpoint + "]"; if (globalCheckpoint != computedGlobalCheckpoint) { - logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint); cps.globalCheckpoint = computedGlobalCheckpoint; + logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint); + onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint); } } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 3aa281da101..08a0111fb4d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -297,8 +297,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP); this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays); - this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings, - SequenceNumbers.UNASSIGNED_SEQ_NO); + final String aId = shardRouting.allocationId().getId(); + this.replicationTracker = + new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {}); // the query cache is a node-level thing, however we want the most popular filters // to be computed on a per-shard basis if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java index e001f82809b..3948da9c111 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java @@ -47,7 +47,9 @@ import java.util.Set; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.function.LongConsumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -60,7 +62,7 @@ import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.not; public class ReplicationTrackerTests extends ESTestCase { - + public void testEmptyShards() { final ReplicationTracker tracker = newTracker(AllocationId.newInitializing()); assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); @@ -99,6 +101,11 @@ public class ReplicationTrackerTests extends ESTestCase { return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet()); } + private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { + tracker.updateLocalCheckpoint(allocationId, localCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())); + } + public void testGlobalCheckpointUpdate() { final long initialClusterStateVersion = randomNonNegativeLong(); Map allocations = new HashMap<>(); @@ -137,14 +144,14 @@ public class ReplicationTrackerTests extends ESTestCase { assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1)); initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size())); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint)); // increment checkpoints active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4))); - allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId))); + allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId))); final long minLocalCheckpointAfterUpdates = allocations.entrySet().stream().map(Map.Entry::getValue).min(Long::compareTo).orElse(UNASSIGNED_SEQ_NO); @@ -153,7 +160,7 @@ public class ReplicationTrackerTests extends ESTestCase { final AllocationId extraId = AllocationId.newInitializing(); // first check that adding it without the master blessing doesn't change anything. - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); assertNull(tracker.checkpoints.get(extraId)); expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(extraId.getId())); @@ -165,7 +172,7 @@ public class ReplicationTrackerTests extends ESTestCase { // now notify for the new id if (randomBoolean()) { - tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); + updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates)); } else { markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4)); @@ -175,6 +182,64 @@ public class ReplicationTrackerTests extends ESTestCase { assertThat(tracker.getGlobalCheckpoint(), greaterThan(minLocalCheckpoint)); } + public void testUpdateGlobalCheckpointOnReplica() { + final AllocationId active = AllocationId.newInitializing(); + final ReplicationTracker tracker = newTracker(active); + final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1); + tracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint)); + final long nonUpdate = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint); + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateGlobalCheckpointOnReplica(nonUpdate, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + final long update = randomLongBetween(globalCheckpoint, Long.MAX_VALUE); + tracker.updateGlobalCheckpointOnReplica(update, "test"); + assertThat(updatedGlobalCheckpoint.get(), equalTo(update)); + } + + public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException { + final long initialClusterStateVersion = randomNonNegativeLong(); + Map activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set active = new HashSet<>(activeWithCheckpoints.keySet()); + Map initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1); + Set initializing = new HashSet<>(initializingWithCheckpoints.keySet()); + final AllocationId primaryId = active.iterator().next(); + final AllocationId replicaId = initializing.iterator().next(); + final ReplicationTracker tracker = newTracker(primaryId); + tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet()); + final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1); + tracker.activatePrimaryMode(localCheckpoint); + tracker.initiateTracking(replicaId.getId()); + final CyclicBarrier barrier = new CyclicBarrier(2); + final Thread thread = new Thread(() -> { + try { + barrier.await(); + tracker.markAllocationIdAsInSync( + replicaId.getId(), + randomLongBetween(NO_OPS_PERFORMED, localCheckpoint - 1)); + barrier.await(); + } catch (BrokenBarrierException | InterruptedException e) { + throw new AssertionError(e); + } + }); + thread.start(); + barrier.await(); + awaitBusy(tracker::pendingInSync); + final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE); + // there is a shard copy pending in sync, the global checkpoint can not advance + updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO); + tracker.updateLocalCheckpoint(primaryId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + // we are implicitly marking the pending in sync copy as in sync with the current global checkpoint, no advancement should occur + tracker.updateLocalCheckpoint(replicaId.getId(), localCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); + barrier.await(); + thread.join(); + // now we expect that the global checkpoint would advance + tracker.markAllocationIdAsInSync(replicaId.getId(), updatedLocalCheckpoint); + assertThat(updatedGlobalCheckpoint.get(), equalTo(updatedLocalCheckpoint)); + } + public void testMissingActiveIdsPreventAdvance() { final Map active = randomAllocationsWithLocalCheckpoints(2, 5); final Map initializing = randomAllocationsWithLocalCheckpoints(0, 5); @@ -191,14 +256,16 @@ public class ReplicationTrackerTests extends ESTestCase { .entrySet() .stream() .filter(e -> !e.getKey().equals(missingActiveID)) - .forEach(e -> tracker.updateLocalCheckpoint(e.getKey().getId(), e.getValue())); + .forEach(e -> updateLocalCheckpoint(tracker, e.getKey().getId(), e.getValue())); if (missingActiveID.equals(primaryId) == false) { assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO)); } // now update all knowledge of all shards - assigned.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + assigned.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testMissingInSyncIdsPreventAdvance() { @@ -213,13 +280,15 @@ public class ReplicationTrackerTests extends ESTestCase { randomSubsetOf(randomIntBetween(1, initializing.size() - 1), initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED)); - active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + active.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(NO_OPS_PERFORMED)); // update again - initializing.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + initializing.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); + assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO))); } public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() { @@ -236,7 +305,7 @@ public class ReplicationTrackerTests extends ESTestCase { List> allocations = Arrays.asList(active, initializing, nonApproved); Collections.shuffle(allocations, random()); - allocations.forEach(a -> a.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP))); + allocations.forEach(a -> a.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP))); assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO))); } @@ -271,7 +340,7 @@ public class ReplicationTrackerTests extends ESTestCase { initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED)); } if (randomBoolean()) { - allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)); + allocations.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)); } // now remove shards @@ -281,9 +350,9 @@ public class ReplicationTrackerTests extends ESTestCase { ids(activeToStay.keySet()), routingTable(initializingToStay.keySet(), primaryId), emptySet()); - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); } else { - allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L)); + allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L)); tracker.updateFromMaster( initialClusterStateVersion + 2, ids(activeToStay.keySet()), @@ -331,7 +400,7 @@ public class ReplicationTrackerTests extends ESTestCase { final List elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList()); Randomness.shuffle(elements); for (int i = 0; i < elements.size(); i++) { - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), elements.get(i)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), elements.get(i)); assertFalse(complete.get()); assertFalse(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync); assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId()))); @@ -339,7 +408,7 @@ public class ReplicationTrackerTests extends ESTestCase { if (randomBoolean()) { // normal path, shard catches up - tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); + updateLocalCheckpoint(tracker, trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64)); // synchronize with the waiting thread to mark that it is complete barrier.await(); assertTrue(complete.get()); @@ -355,13 +424,16 @@ public class ReplicationTrackerTests extends ESTestCase { assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId())); thread.join(); } + + private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO); private ReplicationTracker newTracker(final AllocationId allocationId) { return new ReplicationTracker( new ShardId("test", "_na_", 0), allocationId.getId(), IndexSettingsModule.newIndexSettings("test", Settings.EMPTY), - UNASSIGNED_SEQ_NO); + UNASSIGNED_SEQ_NO, + updatedGlobalCheckpoint::set); } public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException { @@ -488,10 +560,10 @@ public class ReplicationTrackerTests extends ESTestCase { // the tracking allocation IDs should play no role in determining the global checkpoint final Map activeLocalCheckpoints = newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + activeLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); final Map initializingLocalCheckpoints = newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024))); - initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l)); + initializingLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l)); assertTrue( activeLocalCheckpoints .entrySet() @@ -504,6 +576,7 @@ public class ReplicationTrackerTests extends ESTestCase { .allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey().getId()).getLocalCheckpoint() == e.getValue())); final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get(); assertThat(tracker.getGlobalCheckpoint(), equalTo(minimumActiveLocalCheckpoint)); + assertThat(updatedGlobalCheckpoint.get(), equalTo(minimumActiveLocalCheckpoint)); final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get(); // now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs @@ -635,10 +708,11 @@ public class ReplicationTrackerTests extends ESTestCase { FakeClusterState clusterState = initialState(); final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId(); + final LongConsumer onUpdate = updatedGlobalCheckpoint -> {}; ReplicationTracker oldPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); ReplicationTracker newPrimary = - new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO); + new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate); Set allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId)); diff --git a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java index f2652224549..2a84a8f4246 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java @@ -460,7 +460,7 @@ public abstract class EngineTestCase extends ESTestCase { TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler, new NoneCircuitBreakerService(), globalCheckpointSupplier == null ? - new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) : + new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) : globalCheckpointSupplier, primaryTerm::get); return config; }