Expose whether or not the global checkpoint updated (#32659)

It will be useful for future efforts to know if the global checkpoint
was updated. To this end, we need to expose whether or not the global
checkpoint was updated when the state of the replication tracker
updates. For this, we add to the tracker a callback that is invoked
whenever the global checkpoint is updated. For primaries this will be
invoked when the computed global checkpoint is updated based on state
changes to the tracker. For replicas this will be invoked when the local
knowledge of the global checkpoint is advanced from the primary.
This commit is contained in:
Jason Tedor 2018-08-07 15:10:09 -04:00 committed by GitHub
parent 3d5e9114e3
commit dcc816427e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 28 deletions

View File

@ -39,6 +39,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.function.Function;
@ -127,6 +128,13 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
*/
final Map<String, CheckpointState> checkpoints;
/**
* A callback invoked when the global checkpoint is updated. For primary mode this occurs if the computed global checkpoint advances on
* the basis of state changes tracked here. For non-primary mode this occurs if the local knowledge of the global checkpoint advances
* due to an update from the primary.
*/
private final LongConsumer onGlobalCheckpointUpdated;
/**
* This set contains allocation IDs for which there is a thread actively waiting for the local checkpoint to advance to at least the
* current global checkpoint.
@ -391,7 +399,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final ShardId shardId,
final String allocationId,
final IndexSettings indexSettings,
final long globalCheckpoint) {
final long globalCheckpoint,
final LongConsumer onGlobalCheckpointUpdated) {
super(shardId, indexSettings);
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
this.shardAllocationId = allocationId;
@ -400,6 +409,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
this.appliedClusterStateVersion = -1L;
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());
checkpoints.put(allocationId, new CheckpointState(SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint, false, false));
this.onGlobalCheckpointUpdated = Objects.requireNonNull(onGlobalCheckpointUpdated);
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
@ -456,7 +466,10 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
updateGlobalCheckpoint(
shardAllocationId,
globalCheckpoint,
current -> logger.trace("updating global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason));
current -> {
logger.trace("updated global checkpoint from [{}] to [{}] due to [{}]", current, globalCheckpoint, reason);
onGlobalCheckpointUpdated.accept(globalCheckpoint);
});
assert invariant();
}
@ -474,7 +487,7 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
allocationId,
globalCheckpoint,
current -> logger.trace(
"updating local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]",
"updated local knowledge for [{}] on the primary of the global checkpoint from [{}] to [{}]",
allocationId,
current,
globalCheckpoint));
@ -485,8 +498,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
final CheckpointState cps = checkpoints.get(allocationId);
assert !this.shardAllocationId.equals(allocationId) || cps != null;
if (cps != null && globalCheckpoint > cps.globalCheckpoint) {
ifUpdated.accept(cps.globalCheckpoint);
cps.globalCheckpoint = globalCheckpoint;
ifUpdated.accept(cps.globalCheckpoint);
}
}
@ -737,8 +750,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
assert computedGlobalCheckpoint >= globalCheckpoint : "new global checkpoint [" + computedGlobalCheckpoint +
"] is lower than previous one [" + globalCheckpoint + "]";
if (globalCheckpoint != computedGlobalCheckpoint) {
logger.trace("global checkpoint updated to [{}]", computedGlobalCheckpoint);
cps.globalCheckpoint = computedGlobalCheckpoint;
logger.trace("updated global checkpoint to [{}]", computedGlobalCheckpoint);
onGlobalCheckpointUpdated.accept(computedGlobalCheckpoint);
}
}

View File

@ -297,8 +297,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
this.replicationTracker = new ReplicationTracker(shardId, shardRouting.allocationId().getId(), indexSettings,
SequenceNumbers.UNASSIGNED_SEQ_NO);
final String aId = shardRouting.allocationId().getId();
this.replicationTracker =
new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {});
// the query cache is a node-level thing, however we want the most popular filters
// to be computed on a per-shard basis
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {

View File

@ -47,7 +47,9 @@ import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -99,6 +101,11 @@ public class ReplicationTrackerTests extends ESTestCase {
return allocationIds.stream().map(AllocationId::getId).collect(Collectors.toSet());
}
private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) {
tracker.updateLocalCheckpoint(allocationId, localCheckpoint);
assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint()));
}
public void testGlobalCheckpointUpdate() {
final long initialClusterStateVersion = randomNonNegativeLong();
Map<AllocationId, Long> allocations = new HashMap<>();
@ -137,14 +144,14 @@ public class ReplicationTrackerTests extends ESTestCase {
assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1));
initializing.forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
assertThat(tracker.getReplicationGroup().getReplicationTargets().size(), equalTo(1 + initializing.size()));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId)));
allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId)));
assertThat(tracker.getGlobalCheckpoint(), equalTo(minLocalCheckpoint));
// increment checkpoints
active.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
initializing.forEach(aId -> allocations.put(aId, allocations.get(aId) + 1 + randomInt(4)));
allocations.keySet().forEach(aId -> tracker.updateLocalCheckpoint(aId.getId(), allocations.get(aId)));
allocations.keySet().forEach(aId -> updateLocalCheckpoint(tracker, aId.getId(), allocations.get(aId)));
final long minLocalCheckpointAfterUpdates =
allocations.entrySet().stream().map(Map.Entry::getValue).min(Long::compareTo).orElse(UNASSIGNED_SEQ_NO);
@ -153,7 +160,7 @@ public class ReplicationTrackerTests extends ESTestCase {
final AllocationId extraId = AllocationId.newInitializing();
// first check that adding it without the master blessing doesn't change anything.
tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
assertNull(tracker.checkpoints.get(extraId));
expectThrows(IllegalStateException.class, () -> tracker.initiateTracking(extraId.getId()));
@ -165,7 +172,7 @@ public class ReplicationTrackerTests extends ESTestCase {
// now notify for the new id
if (randomBoolean()) {
tracker.updateLocalCheckpoint(extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
updateLocalCheckpoint(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), randomInt((int) minLocalCheckpointAfterUpdates));
} else {
markAsTrackingAndInSyncQuietly(tracker, extraId.getId(), minLocalCheckpointAfterUpdates + 1 + randomInt(4));
@ -175,6 +182,64 @@ public class ReplicationTrackerTests extends ESTestCase {
assertThat(tracker.getGlobalCheckpoint(), greaterThan(minLocalCheckpoint));
}
public void testUpdateGlobalCheckpointOnReplica() {
final AllocationId active = AllocationId.newInitializing();
final ReplicationTracker tracker = newTracker(active);
final long globalCheckpoint = randomLongBetween(NO_OPS_PERFORMED, Long.MAX_VALUE - 1);
tracker.updateGlobalCheckpointOnReplica(globalCheckpoint, "test");
assertThat(updatedGlobalCheckpoint.get(), equalTo(globalCheckpoint));
final long nonUpdate = randomLongBetween(NO_OPS_PERFORMED, globalCheckpoint);
updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO);
tracker.updateGlobalCheckpointOnReplica(nonUpdate, "test");
assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO));
final long update = randomLongBetween(globalCheckpoint, Long.MAX_VALUE);
tracker.updateGlobalCheckpointOnReplica(update, "test");
assertThat(updatedGlobalCheckpoint.get(), equalTo(update));
}
public void testMarkAllocationIdAsInSync() throws BrokenBarrierException, InterruptedException {
final long initialClusterStateVersion = randomNonNegativeLong();
Map<AllocationId, Long> activeWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1);
Set<AllocationId> active = new HashSet<>(activeWithCheckpoints.keySet());
Map<AllocationId, Long> initializingWithCheckpoints = randomAllocationsWithLocalCheckpoints(1, 1);
Set<AllocationId> initializing = new HashSet<>(initializingWithCheckpoints.keySet());
final AllocationId primaryId = active.iterator().next();
final AllocationId replicaId = initializing.iterator().next();
final ReplicationTracker tracker = newTracker(primaryId);
tracker.updateFromMaster(initialClusterStateVersion, ids(active), routingTable(initializing, primaryId), emptySet());
final long localCheckpoint = randomLongBetween(0, Long.MAX_VALUE - 1);
tracker.activatePrimaryMode(localCheckpoint);
tracker.initiateTracking(replicaId.getId());
final CyclicBarrier barrier = new CyclicBarrier(2);
final Thread thread = new Thread(() -> {
try {
barrier.await();
tracker.markAllocationIdAsInSync(
replicaId.getId(),
randomLongBetween(NO_OPS_PERFORMED, localCheckpoint - 1));
barrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new AssertionError(e);
}
});
thread.start();
barrier.await();
awaitBusy(tracker::pendingInSync);
final long updatedLocalCheckpoint = randomLongBetween(1 + localCheckpoint, Long.MAX_VALUE);
// there is a shard copy pending in sync, the global checkpoint can not advance
updatedGlobalCheckpoint.set(UNASSIGNED_SEQ_NO);
tracker.updateLocalCheckpoint(primaryId.getId(), updatedLocalCheckpoint);
assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO));
// we are implicitly marking the pending in sync copy as in sync with the current global checkpoint, no advancement should occur
tracker.updateLocalCheckpoint(replicaId.getId(), localCheckpoint);
assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO));
barrier.await();
thread.join();
// now we expect that the global checkpoint would advance
tracker.markAllocationIdAsInSync(replicaId.getId(), updatedLocalCheckpoint);
assertThat(updatedGlobalCheckpoint.get(), equalTo(updatedLocalCheckpoint));
}
public void testMissingActiveIdsPreventAdvance() {
final Map<AllocationId, Long> active = randomAllocationsWithLocalCheckpoints(2, 5);
final Map<AllocationId, Long> initializing = randomAllocationsWithLocalCheckpoints(0, 5);
@ -191,14 +256,16 @@ public class ReplicationTrackerTests extends ESTestCase {
.entrySet()
.stream()
.filter(e -> !e.getKey().equals(missingActiveID))
.forEach(e -> tracker.updateLocalCheckpoint(e.getKey().getId(), e.getValue()));
.forEach(e -> updateLocalCheckpoint(tracker, e.getKey().getId(), e.getValue()));
if (missingActiveID.equals(primaryId) == false) {
assertThat(tracker.getGlobalCheckpoint(), equalTo(UNASSIGNED_SEQ_NO));
assertThat(updatedGlobalCheckpoint.get(), equalTo(UNASSIGNED_SEQ_NO));
}
// now update all knowledge of all shards
assigned.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
assigned.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP));
assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testMissingInSyncIdsPreventAdvance() {
@ -213,13 +280,15 @@ public class ReplicationTrackerTests extends ESTestCase {
randomSubsetOf(randomIntBetween(1, initializing.size() - 1),
initializing.keySet()).forEach(aId -> markAsTrackingAndInSyncQuietly(tracker, aId.getId(), NO_OPS_PERFORMED));
active.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
active.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP));
assertThat(tracker.getGlobalCheckpoint(), equalTo(NO_OPS_PERFORMED));
assertThat(updatedGlobalCheckpoint.get(), equalTo(NO_OPS_PERFORMED));
// update again
initializing.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
initializing.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP));
assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
assertThat(updatedGlobalCheckpoint.get(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
public void testInSyncIdsAreIgnoredIfNotValidatedByMaster() {
@ -236,7 +305,7 @@ public class ReplicationTrackerTests extends ESTestCase {
List<Map<AllocationId, Long>> allocations = Arrays.asList(active, initializing, nonApproved);
Collections.shuffle(allocations, random());
allocations.forEach(a -> a.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP)));
allocations.forEach(a -> a.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP)));
assertThat(tracker.getGlobalCheckpoint(), not(equalTo(UNASSIGNED_SEQ_NO)));
}
@ -271,7 +340,7 @@ public class ReplicationTrackerTests extends ESTestCase {
initializing.forEach(k -> markAsTrackingAndInSyncQuietly(tracker, k.getId(), NO_OPS_PERFORMED));
}
if (randomBoolean()) {
allocations.forEach((aid, localCP) -> tracker.updateLocalCheckpoint(aid.getId(), localCP));
allocations.forEach((aid, localCP) -> updateLocalCheckpoint(tracker, aid.getId(), localCP));
}
// now remove shards
@ -281,9 +350,9 @@ public class ReplicationTrackerTests extends ESTestCase {
ids(activeToStay.keySet()),
routingTable(initializingToStay.keySet(), primaryId),
emptySet());
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L));
allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L));
} else {
allocations.forEach((aid, ckp) -> tracker.updateLocalCheckpoint(aid.getId(), ckp + 10L));
allocations.forEach((aid, ckp) -> updateLocalCheckpoint(tracker, aid.getId(), ckp + 10L));
tracker.updateFromMaster(
initialClusterStateVersion + 2,
ids(activeToStay.keySet()),
@ -331,7 +400,7 @@ public class ReplicationTrackerTests extends ESTestCase {
final List<Integer> elements = IntStream.rangeClosed(0, globalCheckpoint - 1).boxed().collect(Collectors.toList());
Randomness.shuffle(elements);
for (int i = 0; i < elements.size(); i++) {
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), elements.get(i));
updateLocalCheckpoint(tracker, trackingAllocationId.getId(), elements.get(i));
assertFalse(complete.get());
assertFalse(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId())));
@ -339,7 +408,7 @@ public class ReplicationTrackerTests extends ESTestCase {
if (randomBoolean()) {
// normal path, shard catches up
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
updateLocalCheckpoint(tracker, trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
@ -356,12 +425,15 @@ public class ReplicationTrackerTests extends ESTestCase {
thread.join();
}
private AtomicLong updatedGlobalCheckpoint = new AtomicLong(UNASSIGNED_SEQ_NO);
private ReplicationTracker newTracker(final AllocationId allocationId) {
return new ReplicationTracker(
new ShardId("test", "_na_", 0),
allocationId.getId(),
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
UNASSIGNED_SEQ_NO);
UNASSIGNED_SEQ_NO,
updatedGlobalCheckpoint::set);
}
public void testWaitForAllocationIdToBeInSyncCanBeInterrupted() throws BrokenBarrierException, InterruptedException {
@ -488,10 +560,10 @@ public class ReplicationTrackerTests extends ESTestCase {
// the tracking allocation IDs should play no role in determining the global checkpoint
final Map<AllocationId, Integer> activeLocalCheckpoints =
newActiveAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024)));
activeLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l));
activeLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l));
final Map<AllocationId, Integer> initializingLocalCheckpoints =
newInitializingAllocationIds.stream().collect(Collectors.toMap(Function.identity(), a -> randomIntBetween(1, 1024)));
initializingLocalCheckpoints.forEach((a, l) -> tracker.updateLocalCheckpoint(a.getId(), l));
initializingLocalCheckpoints.forEach((a, l) -> updateLocalCheckpoint(tracker, a.getId(), l));
assertTrue(
activeLocalCheckpoints
.entrySet()
@ -504,6 +576,7 @@ public class ReplicationTrackerTests extends ESTestCase {
.allMatch(e -> tracker.getTrackedLocalCheckpointForShard(e.getKey().getId()).getLocalCheckpoint() == e.getValue()));
final long minimumActiveLocalCheckpoint = (long) activeLocalCheckpoints.values().stream().min(Integer::compareTo).get();
assertThat(tracker.getGlobalCheckpoint(), equalTo(minimumActiveLocalCheckpoint));
assertThat(updatedGlobalCheckpoint.get(), equalTo(minimumActiveLocalCheckpoint));
final long minimumInitailizingLocalCheckpoint = (long) initializingLocalCheckpoints.values().stream().min(Integer::compareTo).get();
// now we are going to add a new allocation ID and bring it in sync which should move it to the in-sync allocation IDs
@ -635,10 +708,11 @@ public class ReplicationTrackerTests extends ESTestCase {
FakeClusterState clusterState = initialState();
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
ReplicationTracker oldPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO);
new ReplicationTracker(shardId, primaryAllocationId.getId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate);
ReplicationTracker newPrimary =
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO);
new ReplicationTracker(shardId, primaryAllocationId.getRelocationId(), indexSettings, UNASSIGNED_SEQ_NO, onUpdate);
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));

View File

@ -460,7 +460,7 @@ public abstract class EngineTestCase extends ESTestCase {
TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler,
new NoneCircuitBreakerService(),
globalCheckpointSupplier == null ?
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED) :
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
globalCheckpointSupplier, primaryTerm::get);
return config;
}