Propagate forceExecution when acquiring permit (#60634)

Currently the transport replication action does not propagate the force
execution parameter when acquiring the indexing permit. The logic to
acquire the index permit supports force execution, so this parameter
should be propagate. Fixes #60359.
This commit is contained in:
Tim Brooks 2020-08-05 13:36:19 -06:00
parent d88098c1d5
commit 2f76c48ea7
No known key found for this signature in database
GPG Key ID: C2AA3BB91A889E77
5 changed files with 21 additions and 12 deletions

View File

@ -120,6 +120,7 @@ public abstract class TransportReplicationAction<
protected final IndicesService indicesService; protected final IndicesService indicesService;
protected final TransportRequestOptions transportOptions; protected final TransportRequestOptions transportOptions;
protected final String executor; protected final String executor;
protected final boolean forceExecutionOnPrimary;
// package private for testing // package private for testing
protected final String transportReplicaAction; protected final String transportReplicaAction;
@ -158,6 +159,7 @@ public abstract class TransportReplicationAction<
this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings); this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings); this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
this.forceExecutionOnPrimary = forceExecutionOnPrimary;
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);
@ -906,7 +908,7 @@ public abstract class TransportReplicationAction<
protected void acquirePrimaryOperationPermit(final IndexShard primary, protected void acquirePrimaryOperationPermit(final IndexShard primary,
final Request request, final Request request,
final ActionListener<Releasable> onAcquired) { final ActionListener<Releasable> onAcquired) {
primary.acquirePrimaryOperationPermit(onAcquired, executor, request); primary.acquirePrimaryOperationPermit(onAcquired, executor, request, forceExecutionOnPrimary);
} }
/** /**

View File

@ -60,7 +60,6 @@ public abstract class TransportWriteAction<
Response extends ReplicationResponse & WriteResponse Response extends ReplicationResponse & WriteResponse
> extends TransportReplicationAction<Request, ReplicaRequest, Response> { > extends TransportReplicationAction<Request, ReplicaRequest, Response> {
private final boolean forceExecution;
private final IndexingPressure indexingPressure; private final IndexingPressure indexingPressure;
private final String executor; private final String executor;
@ -74,13 +73,12 @@ public abstract class TransportWriteAction<
super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters, super(settings, actionName, transportService, clusterService, indicesService, threadPool, shardStateAction, actionFilters,
request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary); request, replicaRequest, ThreadPool.Names.SAME, true, forceExecutionOnPrimary);
this.executor = executor; this.executor = executor;
this.forceExecution = forceExecutionOnPrimary;
this.indexingPressure = indexingPressure; this.indexingPressure = indexingPressure;
} }
@Override @Override
protected Releasable checkOperationLimits(Request request) { protected Releasable checkOperationLimits(Request request) {
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
} }
@Override @Override
@ -97,7 +95,7 @@ public abstract class TransportWriteAction<
// If this primary request was received directly from the network, we must mark a new primary // If this primary request was received directly from the network, we must mark a new primary
// operation. This happens if the write action skips the reroute step (ex: rsync) or during // operation. This happens if the write action skips the reroute step (ex: rsync) or during
// primary delegation, after the primary relocation hand-off. // primary delegation, after the primary relocation hand-off.
return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecution); return indexingPressure.markPrimaryOperationStarted(primaryOperationSize(request), forceExecutionOnPrimary);
} }
} }
@ -107,7 +105,7 @@ public abstract class TransportWriteAction<
@Override @Override
protected Releasable checkReplicaLimits(ReplicaRequest request) { protected Releasable checkReplicaLimits(ReplicaRequest request) {
return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecution); return indexingPressure.markReplicaOperationStarted(replicaOperationSize(request), forceExecutionOnPrimary);
} }
protected long replicaOperationSize(ReplicaRequest request) { protected long replicaOperationSize(ReplicaRequest request) {
@ -163,7 +161,7 @@ public abstract class TransportWriteAction<
@Override @Override
public boolean isForceExecution() { public boolean isForceExecution() {
return forceExecution; return forceExecutionOnPrimary;
} }
}); });
} }

View File

@ -2791,10 +2791,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
* isn't used * isn't used
*/ */
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) { public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo) {
acquirePrimaryOperationPermit(onPermitAcquired, executorOnDelay, debugInfo, false);
}
public void acquirePrimaryOperationPermit(ActionListener<Releasable> onPermitAcquired, String executorOnDelay, Object debugInfo,
boolean forceExecution) {
verifyNotClosed(); verifyNotClosed();
assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting; assert shardRouting.primary() : "acquirePrimaryOperationPermit should only be called on primary shard: " + shardRouting;
indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, false, debugInfo); indexShardOperationPermits.acquire(wrapPrimaryOperationPermitListener(onPermitAcquired), executorOnDelay, forceExecution,
debugInfo);
} }
/** /**

View File

@ -131,7 +131,7 @@ public class TransportResyncReplicationActionTests extends ESTestCase {
acquiredPermits.incrementAndGet(); acquiredPermits.incrementAndGet();
callback.onResponse(acquiredPermits::decrementAndGet); callback.onResponse(acquiredPermits::decrementAndGet);
return null; return null;
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(true));
when(indexShard.getReplicationGroup()).thenReturn( when(indexShard.getReplicationGroup()).thenReturn(
new ReplicationGroup(shardRoutingTable, new ReplicationGroup(shardRoutingTable,
clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()), clusterService.state().metadata().index(index).inSyncAllocationIds(shardId.id()),

View File

@ -126,6 +126,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -152,6 +153,7 @@ public class TransportReplicationActionTests extends ESTestCase {
private static ThreadPool threadPool; private static ThreadPool threadPool;
private boolean forceExecute;
private ClusterService clusterService; private ClusterService clusterService;
private TransportService transportService; private TransportService transportService;
private CapturingTransport transport; private CapturingTransport transport;
@ -172,6 +174,7 @@ public class TransportReplicationActionTests extends ESTestCase {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
forceExecute = randomBoolean();
transport = new CapturingTransport(); transport = new CapturingTransport();
clusterService = createClusterService(threadPool); clusterService = createClusterService(threadPool);
transportService = transport.createTransportService(clusterService.getSettings(), threadPool, transportService = transport.createTransportService(clusterService.getSettings(), threadPool,
@ -839,7 +842,7 @@ public class TransportReplicationActionTests extends ESTestCase {
//noinspection unchecked //noinspection unchecked
((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet); ((ActionListener<Releasable>)invocation.getArguments()[0]).onResponse(count::decrementAndGet);
return null; return null;
}).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject()); }).when(shard).acquirePrimaryOperationPermit(any(), anyString(), anyObject(), eq(forceExecute));
when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get()); when(shard.getActiveOperationsCount()).thenAnswer(i -> count.get());
final IndexService indexService = mock(IndexService.class); final IndexService indexService = mock(IndexService.class);
@ -1272,7 +1275,7 @@ public class TransportReplicationActionTests extends ESTestCase {
super(settings, actionName, transportService, clusterService, indicesService, threadPool, super(settings, actionName, transportService, clusterService, indicesService, threadPool,
shardStateAction, shardStateAction,
new ActionFilters(new HashSet<>()), new ActionFilters(new HashSet<>()),
Request::new, Request::new, ThreadPool.Names.SAME); Request::new, Request::new, ThreadPool.Names.SAME, false, forceExecute);
} }
@Override @Override
@ -1343,7 +1346,7 @@ public class TransportReplicationActionTests extends ESTestCase {
callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED)); callback.onFailure(new ShardNotInPrimaryModeException(shardId, IndexShardState.STARTED));
} }
return null; return null;
}).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject()); }).when(indexShard).acquirePrimaryOperationPermit(any(ActionListener.class), anyString(), anyObject(), eq(forceExecute));
doAnswer(invocation -> { doAnswer(invocation -> {
long term = (Long)invocation.getArguments()[0]; long term = (Long)invocation.getArguments()[0];
ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3]; ActionListener<Releasable> callback = (ActionListener<Releasable>) invocation.getArguments()[3];