Verify primary mode usage with assertions (#32667)

Primary terms were introduced as part of the sequence-number effort (#10708) and added in ES
5.0. Subsequent work introduced the replication tracker which lets the primary own its replication
group (#25692) to coordinate recovery and replication. The replication tracker explicitly exposes
whether it is operating in primary mode or replica mode, independent of the ShardRouting object
that's associated with a shard. During a primary relocation, for example, the primary mode is
transferred between the primary relocation source and the primary relocation target. After
transferring this so-called primary context, the old primary becomes a replication target and the
new primary the replication source, reflected in the replication tracker on both nodes. With the
most recent PR in this area (#32442), we finally have a clean transition between a shard that's
operating as a primary and issuing sequence numbers and a shard that's serving as a replication
target. The transition from one state to the other is enforced through the operation-permit system,
where we block permit acquisition during such changes and perform the transition under this
operation block, ensuring that there are no operations in progress while the transition is being
performed. This finally allows us to turn the best-effort checks that were put in place to prevent
shards from being used in the wrong way (i.e. primary as replica, or replica as primary) into hard
assertions, making it easier to catch any bugs in this area.
This commit is contained in:
Yannick Welsch 2018-08-07 15:02:37 +02:00 committed by GitHub
parent 3ce984d746
commit 45066b5e89
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 41 deletions

View File

@ -1446,10 +1446,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
} else {
if (origin == Engine.Operation.Origin.PRIMARY) {
verifyPrimary();
assert assertPrimaryMode();
} else {
assert origin == Engine.Operation.Origin.REPLICA;
verifyReplicationTarget();
assert assertReplicationTarget();
}
if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]");
@ -1457,19 +1457,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
}
}
private void verifyPrimary() {
if (shardRouting.primary() == false) {
throw new IllegalStateException("shard " + shardRouting + " is not a primary");
}
private boolean assertPrimaryMode() {
assert shardRouting.primary() && replicationTracker.isPrimaryMode() : "shard " + shardRouting + " is not a primary shard in primary mode";
return true;
}
private void verifyReplicationTarget() {
final IndexShardState state = state();
if (shardRouting.primary() && shardRouting.active() && replicationTracker.isPrimaryMode()) {
// must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " +
"relocation hand off, state is [" + state + "]");
}
private boolean assertReplicationTarget() {
assert replicationTracker.isPrimaryMode() == false : "shard " + shardRouting + " in primary mode cannot be a replication target";
return true;
}
private void verifyNotClosed() throws IllegalIndexShardStateException {
@ -1716,7 +1711,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param checkpoint the local checkpoint for the shard
*/
public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.updateLocalCheckpoint(allocationId, checkpoint);
}
@ -1728,7 +1723,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param globalCheckpoint the global checkpoint
*/
public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
}
@ -1750,7 +1745,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param allocationId the allocation ID of the shard for which recovery was initiated
*/
public void initiateTracking(final String allocationId) {
verifyPrimary();
assert assertPrimaryMode();
replicationTracker.initiateTracking(allocationId);
}
@ -1763,7 +1758,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param localCheckpoint the current local checkpoint on the shard
*/
public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException {
verifyPrimary();
assert assertPrimaryMode();
replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint);
}
@ -1798,7 +1793,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID
*/
public ObjectLongMap<String> getInSyncGlobalCheckpoints() {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.getInSyncGlobalCheckpoints();
}
@ -1808,11 +1803,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* primary.
*/
public void maybeSyncGlobalCheckpoint(final String reason) {
verifyPrimary();
verifyNotClosed();
assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard";
if (replicationTracker.isPrimaryMode() == false) {
return;
}
assert assertPrimaryMode();
// only sync if there are not operations in flight
final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint());
if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) {
@ -1838,7 +1834,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @return the replication group
*/
public ReplicationGroup getReplicationGroup() {
verifyPrimary();
assert assertPrimaryMode();
verifyNotClosed();
return replicationTracker.getReplicationGroup();
}
@ -1850,7 +1846,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param reason the reason the global checkpoint was updated
*/
public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) {
verifyReplicationTarget();
assert assertReplicationTarget();
final long localCheckpoint = getLocalCheckpoint();
if (globalCheckpoint > localCheckpoint) {
/*
@ -1877,8 +1873,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @param primaryContext the sequence number context
*/
public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) {
verifyPrimary();
assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting;
assert shardRouting.primary() && shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: " + shardRouting;
assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) &&
getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint();
synchronized (mutex) {
@ -1892,7 +1887,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* @return {@code true} if there is at least one shard pending in-sync, otherwise false
*/
public boolean pendingInSync() {
verifyPrimary();
assert assertPrimaryMode();
return replicationTracker.pendingInSync();
}
@ -2209,7 +2204,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
*/
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
verifyNotClosed();
verifyPrimary();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
}
@ -2259,7 +2254,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
final Object debugInfo) {
verifyNotClosed();
verifyReplicationTarget();
if (opPrimaryTerm > pendingPrimaryTerm) {
synchronized (mutex) {
if (opPrimaryTerm > pendingPrimaryTerm) {
@ -2312,6 +2306,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
operationPrimaryTerm);
onPermitAcquired.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
} catch (Exception e) {

View File

@ -31,6 +31,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Constants;
import org.elasticsearch.Assertions;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -560,28 +561,20 @@ public class IndexShardTests extends IndexShardTestCase {
ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null,
true, ShardRoutingState.STARTED, replicaRouting.allocationId());
final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000);
CountDownLatch latch = new CountDownLatch(1);
indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> {
assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm));
latch.countDown();
}, 0L,
Collections.singleton(indexShard.routingEntry().allocationId().getId()),
new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(),
Collections.emptySet());
latch.await();
} else {
indexShard = newStartedShard(true);
}
final long primaryTerm = indexShard.getPendingPrimaryTerm();
assertEquals(0, indexShard.getActiveOperationsCount());
if (indexShard.routingEntry().isRelocationTarget() == false) {
try {
final PlainActionFuture<Releasable> permitAcquiredFuture = new PlainActionFuture<>();
indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), permitAcquiredFuture,
ThreadPool.Names.WRITE, "");
permitAcquiredFuture.actionGet();
fail("shard shouldn't accept operations as replica");
} catch (IllegalStateException ignored) {
}
}
Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard);
assertEquals(1, indexShard.getActiveOperationsCount());
Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard);
@ -590,6 +583,22 @@ public class IndexShardTests extends IndexShardTestCase {
Releasables.close(operation1, operation2);
assertEquals(0, indexShard.getActiveOperationsCount());
if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) {
assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm,
indexShard.getGlobalCheckpoint(), new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
fail();
}
@Override
public void onFailure(Exception e) {
fail();
}
},
ThreadPool.Names.WRITE, "")).getMessage(), containsString("in primary mode cannot be a replication target"));
}
closeShards(indexShard);
}
@ -647,11 +656,11 @@ public class IndexShardTests extends IndexShardTestCase {
logger.info("shard routing to {}", shardRouting);
assertEquals(0, indexShard.getActiveOperationsCount());
if (shardRouting.primary() == false) {
final IllegalStateException e =
expectThrows(IllegalStateException.class,
if (shardRouting.primary() == false && Assertions.ENABLED) {
final AssertionError e =
expectThrows(AssertionError.class,
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary")));
assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard")));
}
final long primaryTerm = indexShard.getPendingPrimaryTerm();