diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 7d691717de1..22a0777f7bf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -101,8 +101,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { final ShardId shardId = indexShard.shardId(); - if (indexShard.getActiveOperationsCount() != 0) { - throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); + if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) { + throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing"); } final ClusterBlocks clusterBlocks = clusterService.state().blocks(); diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 92687d4880e..80fd7162f3d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -514,6 +514,7 @@ public abstract class TransportReplicationAction< @Override public void onResponse(Releasable releasable) { try { + assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); releasable.close(); // release shard operation lock before responding to caller final TransportReplicationAction.ReplicaResponse response = @@ -913,6 +914,7 @@ public abstract class TransportReplicationAction< return result; }); } + assert indexShard.getActiveOperationsCount() != 0 : "must perform shard operation under a permit"; shardOperationOnPrimary(request, indexShard, listener); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f9db0b15e7a..d1b5a25db6d 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -636,7 +636,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { forceRefreshes.close(); // no shard operation permits are being held here, move state from started to relocated - assert indexShardOperationPermits.getActiveOperationsCount() == 0 : + assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED : "in-flight operations in progress while moving shard state to relocated"; /* * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a @@ -1553,7 +1553,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl assert assertReplicationTarget(); } else { assert origin == Engine.Operation.Origin.LOCAL_RESET; - assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; + assert getActiveOperationsCount() == OPERATIONS_BLOCKED + : "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]"; } if (writeAllowedStates.contains(state) == false) { throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + @@ -2747,8 +2748,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm()); } + public static final int OPERATIONS_BLOCKED = -1; + + /** + * Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held (even if there are + * outstanding operations in flight). + * + * @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held. + */ public int getActiveOperationsCount() { - // refCount is incremented on successful acquire and decremented on close return indexShardOperationPermits.getActiveOperationsCount(); } @@ -3076,7 +3084,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. */ void resetEngineToGlobalCheckpoint() throws IOException { - assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; + assert getActiveOperationsCount() == OPERATIONS_BLOCKED + : "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']'; sync(); // persist the global checkpoint to disk final SeqNoStats seqNoStats = seqNoStats(); final TranslogStats translogStats = translogStats(); diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java index fe7a5392a08..672e69743d4 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShardOperationPermits.java @@ -293,19 +293,14 @@ final class IndexShardOperationPermits implements Closeable { } /** - * Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). + * Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held. * - * @return the active operation count, or zero when all permits are held + * @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held. */ int getActiveOperationsCount() { int availablePermits = semaphore.availablePermits(); if (availablePermits == 0) { - /* - * This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the - * remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that - * the active operations count is zero. - */ - return 0; + return IndexShard.OPERATIONS_BLOCKED; // This occurs when blockOperations() has acquired all the permits. } else { return TOTAL_PERMITS - availablePermits; } diff --git a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 0423559aaf5..79a2d6c3c0a 100644 --- a/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -538,7 +538,7 @@ public class SyncedFlushService implements IndexEventListener { throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard"); } int opCount = indexShard.getActiveOperationsCount(); - return new InFlightOpsResponse(opCount); + return new InFlightOpsResponse(opCount == IndexShard.OPERATIONS_BLOCKED ? 0 : opCount); } public static final class PreShardSyncedFlushRequest extends TransportRequest { @@ -781,6 +781,7 @@ public class SyncedFlushService implements IndexEventListener { } InFlightOpsResponse(int opCount) { + assert opCount >= 0 : opCount; this.opCount = opCount; } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index dcdfbb755ba..cd4d8ae6857 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -100,7 +100,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { super.setUp(); indexShard = mock(IndexShard.class); - when(indexShard.getActiveOperationsCount()).thenReturn(0); + when(indexShard.getActiveOperationsCount()).thenReturn(IndexShard.OPERATIONS_BLOCKED); final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); when(indexShard.shardId()).thenReturn(shardId); @@ -165,12 +165,12 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { assertThat(flushRequest.getValue().force(), is(true)); } - public void testOperationFailsWithOnGoingOps() { - when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10)); + public void testOperationFailsWhenNotBlocked() { + when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10)); IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), - equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing")); + equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing")); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index ed3663ed18d..167518f4fc9 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -58,6 +58,7 @@ import java.nio.charset.Charset; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Collections.emptyList; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -118,13 +119,17 @@ public class TransportResyncReplicationActionTests extends ESTestCase { final String allocationId = primaryShardRouting.allocationId().getId(); final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); + final AtomicInteger acquiredPermits = new AtomicInteger(); final IndexShard indexShard = mock(IndexShard.class); when(indexShard.shardId()).thenReturn(shardId); when(indexShard.routingEntry()).thenReturn(primaryShardRouting); when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm); + when(indexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm); + when(indexShard.getActiveOperationsCount()).then(i -> acquiredPermits.get()); doAnswer(invocation -> { ActionListener callback = (ActionListener) invocation.getArguments()[0]; - callback.onResponse(() -> logger.trace("released")); + acquiredPermits.incrementAndGet(); + callback.onResponse(acquiredPermits::decrementAndGet); return null; }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); when(indexShard.getReplicationGroup()).thenReturn( diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ccb23a9111a..12cc9097b65 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -87,6 +87,7 @@ import org.elasticsearch.transport.nio.MockNioTransport; import org.hamcrest.Matcher; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -678,16 +679,17 @@ public class TransportReplicationActionTests extends ESTestCase { }; TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(NO_SHARD_ID); - primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { - final ElasticsearchException exception = new ElasticsearchException("testing"); - primary.failShard("test", exception); + shard.runUnderPrimaryPermit(() -> + primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { + final ElasticsearchException exception = new ElasticsearchException("testing"); + primary.failShard("test", exception); - verify(shard).failShard("test", exception); + verify(shard).failShard("test", exception); - primary.close(); + primary.close(); - assertTrue(closed.get()); - })); + assertTrue(closed.get()); + })), Assert::assertNotNull, null, null); } public void testReplicaProxy() throws InterruptedException, ExecutionException { @@ -775,10 +777,12 @@ public class TransportReplicationActionTests extends ESTestCase { inSyncIds, shardRoutingTable.getAllAllocationIds())); doAnswer(invocation -> { + count.incrementAndGet(); //noinspection unchecked - ((ActionListener)invocation.getArguments()[0]).onResponse(() -> {}); + ((ActionListener)invocation.getArguments()[0]).onResponse(count::decrementAndGet); return null; }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); final IndexService indexService = mock(IndexService.class); when(indexService.getShard(shard.shardId().id())).thenReturn(shard); @@ -1286,6 +1290,8 @@ public class TransportReplicationActionTests extends ESTestCase { return null; }).when(indexShard) .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); + when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get()); + when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { final ClusterState state = clusterService.state(); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 8463d66e98e..28373347b19 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -316,7 +316,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { @Override void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { - assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); + assertEquals("All permits must be acquired", + IndexShard.OPERATIONS_BLOCKED, reference.indexShard.getActiveOperationsCount()); assertSame(primary, reference.indexShard); final ClusterState clusterState = clusterService.state(); @@ -549,13 +550,13 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe @Override protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, ActionListener> listener) { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount()); super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { - assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); + assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount()); return super.shardOperationOnReplica(shardRequest, shard); } } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index c682cbf3c84..5990567c0a0 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -84,6 +84,7 @@ import org.elasticsearch.test.InternalSettingsPlugin; import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolStats; +import org.junit.Assert; import java.io.IOException; import java.io.UncheckedIOException; @@ -878,7 +879,18 @@ public class IndexShardIT extends ESSingleNodeTestCase { shard.refresh("test"); assertThat(client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs)); assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); - shard.resetEngineToGlobalCheckpoint(); + + final CountDownLatch engineResetLatch = new CountDownLatch(1); + shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> { + try { + shard.resetEngineToGlobalCheckpoint(); + } finally { + r.close(); + engineResetLatch.countDown(); + } + }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L)); + engineResetLatch.await(); + final long moreDocs = between(10, 20); for (int i = 0; i < moreDocs; i++) { client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java index a785c2c4d82..416e7170990 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardOperationPermitsTests.java @@ -523,8 +523,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase { future2.get().close(); assertThat(permits.getActiveOperationsCount(), equalTo(0)); - try (Releasable releasable = blockAndWait()) { - assertThat(permits.getActiveOperationsCount(), equalTo(0)); + try (Releasable ignored = blockAndWait()) { + assertThat(permits.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED)); } PlainActionFuture future3 = new PlainActionFuture<>(); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 05a81c6de3c..41b67369647 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -126,6 +126,7 @@ import org.elasticsearch.test.DummyShardLock; import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; +import org.junit.Assert; import java.io.IOException; import java.nio.charset.Charset; @@ -707,7 +708,7 @@ public class IndexShardTests extends IndexShardTestCase { if (singlePermit) { assertThat(indexShard.getActiveOperationsCount(), greaterThan(0)); } else { - assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); + assertThat(indexShard.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED)); } releasable.close(); super.onResponse(releasable); @@ -757,7 +758,7 @@ public class IndexShardTests extends IndexShardTestCase { indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L)); allPermitsAcquired.await(); assertTrue(blocked.get()); - assertEquals(0, indexShard.getActiveOperationsCount()); + assertEquals(IndexShard.OPERATIONS_BLOCKED, indexShard.getActiveOperationsCount()); assertTrue("Expected no results, operations are blocked", results.asList().isEmpty()); futures.forEach(future -> assertFalse(future.isDone())); @@ -3666,7 +3667,17 @@ public class IndexShardTests extends IndexShardTestCase { }); thread.start(); latch.await(); - shard.resetEngineToGlobalCheckpoint(); + + final CountDownLatch engineResetLatch = new CountDownLatch(1); + shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), globalCheckpoint, 0L, ActionListener.wrap(r -> { + try { + shard.resetEngineToGlobalCheckpoint(); + } finally { + r.close(); + engineResetLatch.countDown(); + } + }, Assert::assertNotNull), TimeValue.timeValueMinutes(1L)); + engineResetLatch.await(); assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));