Assert TransportReplicationActions acquire permits (#41271)

Today we do not distinguish "no operations in flight" from "operations are
blocked", since both return `0` from `IndexShard#getActiveOperationsCount()`.
We therefore cannot assert that every `TransportReplicationAction` performs its
actions under permit(s). This commit fixes this by returning
`IndexShard#OPERATIONS_BLOCKED` if operations are blocked, allowing these two
cases to be distinguished.
This commit is contained in:
David Turner 2019-04-17 19:54:55 +01:00 committed by David Turner
parent 66366d0307
commit 946baf87d3
12 changed files with 79 additions and 37 deletions

View File

@ -101,8 +101,8 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId(); final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) { if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing");
} }
final ClusterBlocks clusterBlocks = clusterService.state().blocks(); final ClusterBlocks clusterBlocks = clusterService.state().blocks();

View File

@ -514,6 +514,7 @@ public abstract class TransportReplicationAction<
@Override @Override
public void onResponse(Releasable releasable) { public void onResponse(Releasable releasable) {
try { try {
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica); final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
releasable.close(); // release shard operation lock before responding to caller releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response = final TransportReplicationAction.ReplicaResponse response =
@ -913,6 +914,7 @@ public abstract class TransportReplicationAction<
return result; return result;
}); });
} }
assert indexShard.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
shardOperationOnPrimary(request, indexShard, listener); shardOperationOnPrimary(request, indexShard, listener);
} }

View File

@ -636,7 +636,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
forceRefreshes.close(); forceRefreshes.close();
// no shard operation permits are being held here, move state from started to relocated // no shard operation permits are being held here, move state from started to relocated
assert indexShardOperationPermits.getActiveOperationsCount() == 0 : assert indexShardOperationPermits.getActiveOperationsCount() == OPERATIONS_BLOCKED :
"in-flight operations in progress while moving shard state to relocated"; "in-flight operations in progress while moving shard state to relocated";
/* /*
* We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a * We should not invoke the runnable under the mutex as the expected implementation is to handoff the primary context via a
@ -1553,7 +1553,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
assert assertReplicationTarget(); assert assertReplicationTarget();
} else { } else {
assert origin == Engine.Operation.Origin.LOCAL_RESET; assert origin == Engine.Operation.Origin.LOCAL_RESET;
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "locally resetting without blocking operations, active operations are [" + getActiveOperations() + "]";
} }
if (writeAllowedStates.contains(state) == false) { if (writeAllowedStates.contains(state) == false) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " +
@ -2747,8 +2748,15 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm()); return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm());
} }
public static final int OPERATIONS_BLOCKED = -1;
/**
* Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held (even if there are
* outstanding operations in flight).
*
* @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held.
*/
public int getActiveOperationsCount() { public int getActiveOperationsCount() {
// refCount is incremented on successful acquire and decremented on close
return indexShardOperationPermits.getActiveOperationsCount(); return indexShardOperationPermits.getActiveOperationsCount();
} }
@ -3076,7 +3084,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint. * Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
*/ */
void resetEngineToGlobalCheckpoint() throws IOException { void resetEngineToGlobalCheckpoint() throws IOException {
assert getActiveOperationsCount() == 0 : "Ongoing writes [" + getActiveOperations() + "]"; assert getActiveOperationsCount() == OPERATIONS_BLOCKED
: "resetting engine without blocking operations; active operations are [" + getActiveOperations() + ']';
sync(); // persist the global checkpoint to disk sync(); // persist the global checkpoint to disk
final SeqNoStats seqNoStats = seqNoStats(); final SeqNoStats seqNoStats = seqNoStats();
final TranslogStats translogStats = translogStats(); final TranslogStats translogStats = translogStats();

View File

@ -293,19 +293,14 @@ final class IndexShardOperationPermits implements Closeable {
} }
/** /**
* Obtain the active operation count, or zero if all permits are held (even if there are outstanding operations in flight). * Obtain the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} if all permits are held.
* *
* @return the active operation count, or zero when all permits are held * @return the active operation count, or {@link IndexShard#OPERATIONS_BLOCKED} when all permits are held.
*/ */
int getActiveOperationsCount() { int getActiveOperationsCount() {
int availablePermits = semaphore.availablePermits(); int availablePermits = semaphore.availablePermits();
if (availablePermits == 0) { if (availablePermits == 0) {
/* return IndexShard.OPERATIONS_BLOCKED; // This occurs when blockOperations() has acquired all the permits.
* This occurs when either doBlockOperations is holding all the permits or there are outstanding operations in flight and the
* remainder of the permits are held by doBlockOperations. We do not distinguish between these two cases and simply say that
* the active operations count is zero.
*/
return 0;
} else { } else {
return TOTAL_PERMITS - availablePermits; return TOTAL_PERMITS - availablePermits;
} }

View File

@ -538,7 +538,7 @@ public class SyncedFlushService implements IndexEventListener {
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard"); throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
} }
int opCount = indexShard.getActiveOperationsCount(); int opCount = indexShard.getActiveOperationsCount();
return new InFlightOpsResponse(opCount); return new InFlightOpsResponse(opCount == IndexShard.OPERATIONS_BLOCKED ? 0 : opCount);
} }
public static final class PreShardSyncedFlushRequest extends TransportRequest { public static final class PreShardSyncedFlushRequest extends TransportRequest {
@ -781,6 +781,7 @@ public class SyncedFlushService implements IndexEventListener {
} }
InFlightOpsResponse(int opCount) { InFlightOpsResponse(int opCount) {
assert opCount >= 0 : opCount;
this.opCount = opCount; this.opCount = opCount;
} }

View File

@ -100,7 +100,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
super.setUp(); super.setUp();
indexShard = mock(IndexShard.class); indexShard = mock(IndexShard.class);
when(indexShard.getActiveOperationsCount()).thenReturn(0); when(indexShard.getActiveOperationsCount()).thenReturn(IndexShard.OPERATIONS_BLOCKED);
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3)); final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
when(indexShard.shardId()).thenReturn(shardId); when(indexShard.shardId()).thenReturn(shardId);
@ -165,12 +165,12 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
assertThat(flushRequest.getValue().force(), is(true)); assertThat(flushRequest.getValue().force(), is(true));
} }
public void testOperationFailsWithOnGoingOps() { public void testOperationFailsWhenNotBlocked() {
when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10)); when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(0, 10));
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(), assertThat(exception.getMessage(),
equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing")); equalTo("Index shard " + indexShard.shardId() + " is not blocking all operations during closing"));
verify(indexShard, times(0)).flush(any(FlushRequest.class)); verify(indexShard, times(0)).flush(any(FlushRequest.class));
} }

View File

@ -58,6 +58,7 @@ import java.nio.charset.Charset;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList; import static java.util.Collections.emptyList;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
@ -118,13 +119,17 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
final String allocationId = primaryShardRouting.allocationId().getId(); final String allocationId = primaryShardRouting.allocationId().getId();
final long primaryTerm = indexMetaData.primaryTerm(shardId.id()); final long primaryTerm = indexMetaData.primaryTerm(shardId.id());
final AtomicInteger acquiredPermits = new AtomicInteger();
final IndexShard indexShard = mock(IndexShard.class); final IndexShard indexShard = mock(IndexShard.class);
when(indexShard.shardId()).thenReturn(shardId); when(indexShard.shardId()).thenReturn(shardId);
when(indexShard.routingEntry()).thenReturn(primaryShardRouting); when(indexShard.routingEntry()).thenReturn(primaryShardRouting);
when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm); when(indexShard.getPendingPrimaryTerm()).thenReturn(primaryTerm);
when(indexShard.getOperationPrimaryTerm()).thenReturn(primaryTerm);
when(indexShard.getActiveOperationsCount()).then(i -> acquiredPermits.get());
doAnswer(invocation -> { doAnswer(invocation -> {
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[0];
callback.onResponse(() -> logger.trace("released")); acquiredPermits.incrementAndGet();
callback.onResponse(acquiredPermits::decrementAndGet);
return null; return null;
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject());
when(indexShard.getReplicationGroup()).thenReturn( when(indexShard.getReplicationGroup()).thenReturn(

View File

@ -87,6 +87,7 @@ import org.elasticsearch.transport.nio.MockNioTransport;
import org.hamcrest.Matcher; import org.hamcrest.Matcher;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -678,16 +679,17 @@ public class TransportReplicationActionTests extends ESTestCase {
}; };
TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable);
final Request request = new Request(NO_SHARD_ID); final Request request = new Request(NO_SHARD_ID);
primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { shard.runUnderPrimaryPermit(() ->
final ElasticsearchException exception = new ElasticsearchException("testing"); primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> {
primary.failShard("test", exception); final ElasticsearchException exception = new ElasticsearchException("testing");
primary.failShard("test", exception);
verify(shard).failShard("test", exception); verify(shard).failShard("test", exception);
primary.close(); primary.close();
assertTrue(closed.get()); assertTrue(closed.get());
})); })), Assert::assertNotNull, null, null);
} }
public void testReplicaProxy() throws InterruptedException, ExecutionException { public void testReplicaProxy() throws InterruptedException, ExecutionException {
@ -775,10 +777,12 @@ public class TransportReplicationActionTests extends ESTestCase {
inSyncIds, inSyncIds,
shardRoutingTable.getAllAllocationIds())); shardRoutingTable.getAllAllocationIds()));
doAnswer(invocation -> { doAnswer(invocation -> {
count.incrementAndGet();
//noinspection unchecked //noinspection unchecked
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(() -> {}); ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet);
return null; return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject());
when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get());
final IndexService indexService = mock(IndexService.class); final IndexService indexService = mock(IndexService.class);
when(indexService.getShard(shard.shardId().id())).thenReturn(shard); when(indexService.getShard(shard.shardId().id())).thenReturn(shard);
@ -1286,6 +1290,8 @@ public class TransportReplicationActionTests extends ESTestCase {
return null; return null;
}).when(indexShard) }).when(indexShard)
.acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject()); .acquireReplicaOperationPermit(anyLong(), anyLong(), anyLong(), any(ActionListener.class), anyString(), anyObject());
when(indexShard.getActiveOperationsCount()).thenAnswer(i -> count.get());
when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> { when(indexShard.routingEntry()).thenAnswer(invocationOnMock -> {
final ClusterState state = clusterService.state(); final ClusterState state = clusterService.state();
final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); final RoutingNode node = state.getRoutingNodes().node(state.nodes().getLocalNodeId());

View File

@ -316,7 +316,8 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) { allPermitsAction.new AsyncPrimaryAction(primaryRequest, allPermitFuture, null) {
@Override @Override
void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) { void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) {
assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount()); assertEquals("All permits must be acquired",
IndexShard.OPERATIONS_BLOCKED, reference.indexShard.getActiveOperationsCount());
assertSame(primary, reference.indexShard); assertSame(primary, reference.indexShard);
final ClusterState clusterState = clusterService.state(); final ClusterState clusterState = clusterService.state();
@ -549,13 +550,13 @@ public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTe
@Override @Override
protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard,
ActionListener<PrimaryResult<Request, Response>> listener) { ActionListener<PrimaryResult<Request, Response>> listener) {
assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount());
super.shardOperationOnPrimary(shardRequest, shard, listener); super.shardOperationOnPrimary(shardRequest, shard, listener);
} }
@Override @Override
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception { protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); assertEquals("All permits must be acquired", IndexShard.OPERATIONS_BLOCKED, shard.getActiveOperationsCount());
return super.shardOperationOnReplica(shardRequest, shard); return super.shardOperationOnReplica(shardRequest, shard);
} }
} }

View File

@ -84,6 +84,7 @@ import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolStats; import org.elasticsearch.threadpool.ThreadPoolStats;
import org.junit.Assert;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
@ -878,7 +879,18 @@ public class IndexShardIT extends ESSingleNodeTestCase {
shard.refresh("test"); shard.refresh("test");
assertThat(client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs)); assertThat(client().search(countRequest).actionGet().getHits().getTotalHits().value, equalTo(numDocs));
assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo())); assertThat(shard.getLocalCheckpoint(), equalTo(shard.seqNoStats().getMaxSeqNo()));
shard.resetEngineToGlobalCheckpoint();
final CountDownLatch engineResetLatch = new CountDownLatch(1);
shard.acquireAllPrimaryOperationsPermits(ActionListener.wrap(r -> {
try {
shard.resetEngineToGlobalCheckpoint();
} finally {
r.close();
engineResetLatch.countDown();
}
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
engineResetLatch.await();
final long moreDocs = between(10, 20); final long moreDocs = between(10, 20);
for (int i = 0; i < moreDocs; i++) { for (int i = 0; i < moreDocs; i++) {
client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get(); client().prepareIndex("test", "_doc", Long.toString(i + numDocs)).setSource("{}", XContentType.JSON).get();

View File

@ -523,8 +523,8 @@ public class IndexShardOperationPermitsTests extends ESTestCase {
future2.get().close(); future2.get().close();
assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(0));
try (Releasable releasable = blockAndWait()) { try (Releasable ignored = blockAndWait()) {
assertThat(permits.getActiveOperationsCount(), equalTo(0)); assertThat(permits.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED));
} }
PlainActionFuture<Releasable> future3 = new PlainActionFuture<>(); PlainActionFuture<Releasable> future3 = new PlainActionFuture<>();

View File

@ -126,6 +126,7 @@ import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.FieldMaskingReader; import org.elasticsearch.test.FieldMaskingReader;
import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Assert;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -707,7 +708,7 @@ public class IndexShardTests extends IndexShardTestCase {
if (singlePermit) { if (singlePermit) {
assertThat(indexShard.getActiveOperationsCount(), greaterThan(0)); assertThat(indexShard.getActiveOperationsCount(), greaterThan(0));
} else { } else {
assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); assertThat(indexShard.getActiveOperationsCount(), equalTo(IndexShard.OPERATIONS_BLOCKED));
} }
releasable.close(); releasable.close();
super.onResponse(releasable); super.onResponse(releasable);
@ -757,7 +758,7 @@ public class IndexShardTests extends IndexShardTestCase {
indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L)); indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L));
allPermitsAcquired.await(); allPermitsAcquired.await();
assertTrue(blocked.get()); assertTrue(blocked.get());
assertEquals(0, indexShard.getActiveOperationsCount()); assertEquals(IndexShard.OPERATIONS_BLOCKED, indexShard.getActiveOperationsCount());
assertTrue("Expected no results, operations are blocked", results.asList().isEmpty()); assertTrue("Expected no results, operations are blocked", results.asList().isEmpty());
futures.forEach(future -> assertFalse(future.isDone())); futures.forEach(future -> assertFalse(future.isDone()));
@ -3666,7 +3667,17 @@ public class IndexShardTests extends IndexShardTestCase {
}); });
thread.start(); thread.start();
latch.await(); latch.await();
shard.resetEngineToGlobalCheckpoint();
final CountDownLatch engineResetLatch = new CountDownLatch(1);
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), globalCheckpoint, 0L, ActionListener.wrap(r -> {
try {
shard.resetEngineToGlobalCheckpoint();
} finally {
r.close();
engineResetLatch.countDown();
}
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
engineResetLatch.await();
assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint)); assertThat(getShardDocUIDs(shard), equalTo(docBelowGlobalCheckpoint));
assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint)); assertThat(shard.seqNoStats().getMaxSeqNo(), equalTo(globalCheckpoint));
assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations())); assertThat(shard.translogStats().estimatedNumberOfOperations(), equalTo(translogStats.estimatedNumberOfOperations()));