ReplicationTracker.markAllocationIdAsInSync may hang if allocation is cancelled (#30316)

At the end of recovery, we mark the recovering shard as "in sync" on the primary. From this point on 
the primary will treat any replication failure on it as critical and will reach out to the master to fail the 
shard. To do so, we wait for the local checkpoint of the recovered shard to be above the global 
checkpoint (in order to maintain global checkpoint invariant).

If the master decides to cancel the allocation of the recovering shard while we wait, the method can 
currently hang and fail to return. It will also ignore the interrupts that are triggered by the cancelled 
recovery due to the primary closing. 

Note that this is crucial as this method is called while holding a primary permit. Since the method 
never comes back, the permit is never released. The unreleased permit will then block any primary 
relocation *and* while the primary is trying to relocate all indexing will be blocked for 30m as it 
waits to acquire the missing permit.
This commit is contained in:
Boaz Leskes 2018-05-02 19:40:29 +02:00 committed by GitHub
parent 0d7ac9a74c
commit 13917162ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 7 deletions

View File

@ -339,6 +339,11 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
"shard copy " + entry.getKey() + " is in-sync but not tracked";
}
// all pending in sync shards are tracked
for (String aId : pendingInSync) {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}
return true;
}
@ -521,6 +526,9 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
checkpoints.put(initializingId, new CheckpointState(localCheckpoint, globalCheckpoint, inSync, inSync));
}
}
if (removedEntries) {
pendingInSync.removeIf(aId -> checkpoints.containsKey(aId) == false);
}
} else {
for (String initializingId : initializingAllocationIds) {
if (shardAllocationId.equals(initializingId) == false) {
@ -549,6 +557,8 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
replicationGroup = calculateReplicationGroup();
if (primaryMode && removedEntries) {
updateGlobalCheckpointOnPrimary();
// notify any waiter for local checkpoint advancement to recheck that their shard is still being tracked.
notifyAllWaiters();
}
}
assert invariant();

View File

@ -305,7 +305,8 @@ public class ReplicationTrackerTests extends ESTestCase {
final AllocationId inSyncAllocationId = AllocationId.newInitializing();
final AllocationId trackingAllocationId = AllocationId.newInitializing();
final ReplicationTracker tracker = newTracker(inSyncAllocationId);
tracker.updateFromMaster(randomNonNegativeLong(), Collections.singleton(inSyncAllocationId.getId()),
final long clusterStateVersion = randomNonNegativeLong();
tracker.updateFromMaster(clusterStateVersion, Collections.singleton(inSyncAllocationId.getId()),
routingTable(Collections.singleton(trackingAllocationId), inSyncAllocationId), emptySet());
tracker.activatePrimaryMode(globalCheckpoint);
final Thread thread = new Thread(() -> {
@ -336,13 +337,22 @@ public class ReplicationTrackerTests extends ESTestCase {
assertBusy(() -> assertTrue(tracker.pendingInSync.contains(trackingAllocationId.getId())));
}
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
if (randomBoolean()) {
// normal path, shard catches up
tracker.updateLocalCheckpoint(trackingAllocationId.getId(), randomIntBetween(globalCheckpoint, 64));
// synchronize with the waiting thread to mark that it is complete
barrier.await();
assertTrue(complete.get());
assertTrue(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()).inSync);
} else {
// master changes its mind and cancels the allocation
tracker.updateFromMaster(clusterStateVersion + 1, Collections.singleton(inSyncAllocationId.getId()),
routingTable(emptySet(), inSyncAllocationId), emptySet());
barrier.await();
assertTrue(complete.get());
assertNull(tracker.getTrackedLocalCheckpointForShard(trackingAllocationId.getId()));
}
assertFalse(tracker.pendingInSync.contains(trackingAllocationId.getId()));
thread.join();
}