diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 9131055bcd9..3aa281da101 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1446,10 +1446,10 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } else { if (origin == Engine.Operation.Origin.PRIMARY) { - verifyPrimary(); + assert assertPrimaryMode(); } else { assert origin == Engine.Operation.Origin.REPLICA; - verifyReplicationTarget(); + assert assertReplicationTarget(); } if (writeAllowedStates.contains(state) == false) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStates + ", origin [" + origin + "]"); @@ -1457,19 +1457,14 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl } } - private void verifyPrimary() { - if (shardRouting.primary() == false) { - throw new IllegalStateException("shard " + shardRouting + " is not a primary"); - } + private boolean assertPrimaryMode() { + assert shardRouting.primary() && replicationTracker.isPrimaryMode() : "shard " + shardRouting + " is not a primary shard in primary mode"; + return true; } - private void verifyReplicationTarget() { - final IndexShardState state = state(); - if (shardRouting.primary() && shardRouting.active() && replicationTracker.isPrimaryMode()) { - // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException - throw new IllegalStateException("active primary shard " + shardRouting + " cannot be a replication target before " + - "relocation hand off, state is [" + state + "]"); - } + private boolean assertReplicationTarget() { + assert replicationTracker.isPrimaryMode() == false : "shard " + shardRouting + " in primary mode cannot be a replication target"; + return true; } private void verifyNotClosed() throws IllegalIndexShardStateException { @@ -1716,7 +1711,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param checkpoint the local checkpoint for the shard */ public void updateLocalCheckpointForShard(final String allocationId, final long checkpoint) { - verifyPrimary(); + assert assertPrimaryMode(); verifyNotClosed(); replicationTracker.updateLocalCheckpoint(allocationId, checkpoint); } @@ -1728,7 +1723,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param globalCheckpoint the global checkpoint */ public void updateGlobalCheckpointForShard(final String allocationId, final long globalCheckpoint) { - verifyPrimary(); + assert assertPrimaryMode(); verifyNotClosed(); replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint); } @@ -1750,7 +1745,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param allocationId the allocation ID of the shard for which recovery was initiated */ public void initiateTracking(final String allocationId) { - verifyPrimary(); + assert assertPrimaryMode(); replicationTracker.initiateTracking(allocationId); } @@ -1763,7 +1758,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param localCheckpoint the current local checkpoint on the shard */ public void markAllocationIdAsInSync(final String allocationId, final long localCheckpoint) throws InterruptedException { - verifyPrimary(); + assert assertPrimaryMode(); replicationTracker.markAllocationIdAsInSync(allocationId, localCheckpoint); } @@ -1798,7 +1793,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @return a map from allocation ID to the local knowledge of the global checkpoint for that allocation ID */ public ObjectLongMap getInSyncGlobalCheckpoints() { - verifyPrimary(); + assert assertPrimaryMode(); verifyNotClosed(); return replicationTracker.getInSyncGlobalCheckpoints(); } @@ -1808,11 +1803,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * primary. */ public void maybeSyncGlobalCheckpoint(final String reason) { - verifyPrimary(); verifyNotClosed(); + assert shardRouting.primary() : "only call maybeSyncGlobalCheckpoint on primary shard"; if (replicationTracker.isPrimaryMode() == false) { return; } + assert assertPrimaryMode(); // only sync if there are not operations in flight final SeqNoStats stats = getEngine().getSeqNoStats(replicationTracker.getGlobalCheckpoint()); if (stats.getMaxSeqNo() == stats.getGlobalCheckpoint()) { @@ -1838,7 +1834,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @return the replication group */ public ReplicationGroup getReplicationGroup() { - verifyPrimary(); + assert assertPrimaryMode(); verifyNotClosed(); return replicationTracker.getReplicationGroup(); } @@ -1850,7 +1846,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param reason the reason the global checkpoint was updated */ public void updateGlobalCheckpointOnReplica(final long globalCheckpoint, final String reason) { - verifyReplicationTarget(); + assert assertReplicationTarget(); final long localCheckpoint = getLocalCheckpoint(); if (globalCheckpoint > localCheckpoint) { /* @@ -1877,8 +1873,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @param primaryContext the sequence number context */ public void activateWithPrimaryContext(final ReplicationTracker.PrimaryContext primaryContext) { - verifyPrimary(); - assert shardRouting.isRelocationTarget() : "only relocation target can update allocation IDs from primary context: " + shardRouting; + assert shardRouting.primary() && shardRouting.isRelocationTarget() : "only primary relocation target can update allocation IDs from primary context: " + shardRouting; assert primaryContext.getCheckpointStates().containsKey(routingEntry().allocationId().getId()) && getLocalCheckpoint() == primaryContext.getCheckpointStates().get(routingEntry().allocationId().getId()).getLocalCheckpoint(); synchronized (mutex) { @@ -1892,7 +1887,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * @return {@code true} if there is at least one shard pending in-sync, otherwise false */ public boolean pendingInSync() { - verifyPrimary(); + assert assertPrimaryMode(); return replicationTracker.pendingInSync(); } @@ -2209,7 +2204,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { verifyNotClosed(); - verifyPrimary(); + assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo); } @@ -2259,7 +2254,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl final ActionListener onPermitAcquired, final String executorOnDelay, final Object debugInfo) { verifyNotClosed(); - verifyReplicationTarget(); if (opPrimaryTerm > pendingPrimaryTerm) { synchronized (mutex) { if (opPrimaryTerm > pendingPrimaryTerm) { @@ -2312,6 +2306,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl operationPrimaryTerm); onPermitAcquired.onFailure(new IllegalStateException(message)); } else { + assert assertReplicationTarget(); try { updateGlobalCheckpointOnReplica(globalCheckpoint, "operation"); } catch (Exception e) { diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 03442be7f06..67a28c9b336 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -31,6 +31,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.util.Constants; +import org.elasticsearch.Assertions; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -560,28 +561,20 @@ public class IndexShardTests extends IndexShardTestCase { ShardRouting primaryRouting = newShardRouting(replicaRouting.shardId(), replicaRouting.currentNodeId(), null, true, ShardRoutingState.STARTED, replicaRouting.allocationId()); final long newPrimaryTerm = indexShard.getPendingPrimaryTerm() + between(1, 1000); + CountDownLatch latch = new CountDownLatch(1); indexShard.updateShardState(primaryRouting, newPrimaryTerm, (shard, listener) -> { assertThat(TestTranslog.getCurrentTerm(getTranslog(indexShard)), equalTo(newPrimaryTerm)); + latch.countDown(); }, 0L, Collections.singleton(indexShard.routingEntry().allocationId().getId()), new IndexShardRoutingTable.Builder(indexShard.shardId()).addShard(primaryRouting).build(), Collections.emptySet()); + latch.await(); } else { indexShard = newStartedShard(true); } final long primaryTerm = indexShard.getPendingPrimaryTerm(); assertEquals(0, indexShard.getActiveOperationsCount()); - if (indexShard.routingEntry().isRelocationTarget() == false) { - try { - final PlainActionFuture permitAcquiredFuture = new PlainActionFuture<>(); - indexShard.acquireReplicaOperationPermit(primaryTerm, indexShard.getGlobalCheckpoint(), permitAcquiredFuture, - ThreadPool.Names.WRITE, ""); - permitAcquiredFuture.actionGet(); - fail("shard shouldn't accept operations as replica"); - } catch (IllegalStateException ignored) { - - } - } Releasable operation1 = acquirePrimaryOperationPermitBlockingly(indexShard); assertEquals(1, indexShard.getActiveOperationsCount()); Releasable operation2 = acquirePrimaryOperationPermitBlockingly(indexShard); @@ -590,6 +583,22 @@ public class IndexShardTests extends IndexShardTestCase { Releasables.close(operation1, operation2); assertEquals(0, indexShard.getActiveOperationsCount()); + if (Assertions.ENABLED && indexShard.routingEntry().isRelocationTarget() == false) { + assertThat(expectThrows(AssertionError.class, () -> indexShard.acquireReplicaOperationPermit(primaryTerm, + indexShard.getGlobalCheckpoint(), new ActionListener() { + @Override + public void onResponse(Releasable releasable) { + fail(); + } + + @Override + public void onFailure(Exception e) { + fail(); + } + }, + ThreadPool.Names.WRITE, "")).getMessage(), containsString("in primary mode cannot be a replication target")); + } + closeShards(indexShard); } @@ -647,11 +656,11 @@ public class IndexShardTests extends IndexShardTestCase { logger.info("shard routing to {}", shardRouting); assertEquals(0, indexShard.getActiveOperationsCount()); - if (shardRouting.primary() == false) { - final IllegalStateException e = - expectThrows(IllegalStateException.class, + if (shardRouting.primary() == false && Assertions.ENABLED) { + final AssertionError e = + expectThrows(AssertionError.class, () -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "")); - assertThat(e, hasToString(containsString("shard " + shardRouting + " is not a primary"))); + assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard"))); } final long primaryTerm = indexShard.getPendingPrimaryTerm();