diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index d71887eaa24..6ce706246b4 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -120,6 +120,7 @@ public abstract class TransportReplicationAction< protected final IndicesService indicesService; protected final TransportRequestOptions transportOptions; protected final String executor; + protected final boolean forceExecutionOnPrimary; // package private for testing protected final String transportReplicaAction; @@ -158,6 +159,7 @@ public abstract class TransportReplicationAction< this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); + this.forceExecutionOnPrimary = forceExecutionOnPrimary; transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); @@ -906,7 +908,7 @@ public abstract class TransportReplicationAction< protected void acquirePrimaryOperationPermit(final IndexShard primary, final Request request, final ActionListener onAcquired) { - primary.acquirePrimaryOperationPermit(onAcquired, executor, request); + primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary); } /** diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 362f73f0b8a..71e76de3b74 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -60,7 +60,6 @@ public abstract class TransportWriteAction< Response extends ReplicationResponse & WriteResponse > extends TransportReplicationAction { - private final boolean forceExecution; private final IndexingPressure indexingPressure; private final String executor; @@ -74,13 +73,12 @@ public abstract class TransportWriteAction< super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); this.executor = executor; - this.forceExecution = forceExecutionOnPrimary; this.indexingPressure = indexingPressure; } @Override protected Releasable checkOperationLimits(Request request) { - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } @Override @@ -97,7 +95,7 @@ public abstract class TransportWriteAction< // If this primary request was received directly from the network, we must mark a new primary // operation. This happens if the write action skips the reroute step (ex: rsync) or during // primary delegation, after the primary relocation hand-off. - return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); + return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary); } } @@ -107,7 +105,7 @@ public abstract class TransportWriteAction< @Override protected Releasable checkReplicaLimits(ReplicaRequest request) { - return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); + return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary); } protected long replicaOperationSize(ReplicaRequest request) { @@ -163,7 +161,7 @@ public abstract class TransportWriteAction< @Override public boolean isForceExecution() { - return forceExecution; + return forceExecutionOnPrimary; } }); } diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 5218cda7124..7cc88133929 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -2791,10 +2791,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl * isn't used */ public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo) { + acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false); + } + + public void acquirePrimaryOperationPermit(ActionListener onPermitAcquired, String executorOnDelay, Object debugInfo, + boolean forceExecution) { verifyNotClosed(); assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; - indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo); + indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution, + debugInfo); } /** diff --git a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java index e3b16b43fe3..fb365ed9678 100644 --- a/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/resync/TransportResyncReplicationActionTests.java @@ -131,7 +131,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase { acquiredPermits.incrementAndGet(); callback.onResponse(acquiredPermits::decrementAndGet); return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true)); when(indexShard.getReplicationGroup()).thenReturn( new ReplicationGroup(shardRoutingTable, clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 25f27d2abfb..56bd370397a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -126,6 +126,7 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -152,6 +153,7 @@ public class TransportReplicationActionTests extends ESTestCase { private static ThreadPool threadPool; + private boolean forceExecute; private ClusterService clusterService; private TransportService transportService; private CapturingTransport transport; @@ -172,6 +174,7 @@ public class TransportReplicationActionTests extends ESTestCase { @Before public void setUp() throws Exception { super.setUp(); + forceExecute = randomBoolean(); transport = new CapturingTransport(); clusterService = createClusterService(threadPool); transportService = transport.createTransportService(clusterService.getSettings(), threadPool, @@ -839,7 +842,7 @@ public class TransportReplicationActionTests extends ESTestCase { //noinspection unchecked ((ActionListener)invocation.getArguments()[0]).onResponse(count::decrementAndGet); return null; - }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); + }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute)); when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); final IndexService indexService = mock(IndexService.class); @@ -1272,7 +1275,7 @@ public class TransportReplicationActionTests extends ESTestCase { super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, new ActionFilters(new HashSet<>()), - Request::new, Request::new, ThreadPool.Names.SAME); + Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute); } @Override @@ -1343,7 +1346,7 @@ public class TransportReplicationActionTests extends ESTestCase { callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); } return null; - }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); + }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute)); doAnswer(invocation -> { long term = (Long)invocation.getArguments()[0]; ActionListener callback = (ActionListener) invocation.getArguments()[3];