mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-28 10:58:30 +00:00
Expose all permits acquisition in IndexShard and TransportReplicationAction (#35540)
This pull request exposes two new methods in the IndexShard and TransportReplicationAction classes in order to allow transport replication actions to acquire all index shard operation permits for their execution. It first adds the acquireAllPrimaryOperationPermits() and the acquireAllReplicaOperationsPermits() methods to the IndexShard class which allow to acquire all operations permits on a shard while exposing a Releasable. It also refactors the TransportReplicationAction class to expose two protected methods (acquirePrimaryOperationPermit() and acquireReplicaOperationPermit()) that can be overridden when a transport replication action requires the acquisition of all permits on primary and/or replica shard during execution. Finally, it adds a TransportReplicationAllPermitsAcquisitionTests which illustrates how a transport replication action can grab all permits before adding a cluster block in the cluster state, making subsequent operations that requires a single permit to fail). Related to elastic #33888
This commit is contained in:
parent
3c059ee057
commit
2e37f17a7d
server/src
main/java/org/elasticsearch
test/java/org/elasticsearch
action/support/replication
index/shard
88
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
88
server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
@ -313,7 +313,7 @@ public abstract class TransportReplicationAction<
|
||||
}
|
||||
}
|
||||
|
||||
class AsyncPrimaryAction extends AbstractRunnable implements ActionListener<PrimaryShardReference> {
|
||||
class AsyncPrimaryAction extends AbstractRunnable {
|
||||
|
||||
private final Request request;
|
||||
// targetAllocationID of the shard this request is meant for
|
||||
@ -334,11 +334,33 @@ public abstract class TransportReplicationAction<
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
acquirePrimaryShardReference(request.shardId(), targetAllocationID, primaryTerm, this, request);
|
||||
final ShardId shardId = request.shardId();
|
||||
final IndexShard indexShard = getIndexShard(shardId);
|
||||
final ShardRouting shardRouting = indexShard.routingEntry();
|
||||
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
||||
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
|
||||
// the replica will take over and a replica will be assigned to the first node.
|
||||
if (shardRouting.primary() == false) {
|
||||
throw new ReplicationOperation.RetryOnPrimaryException(shardId, "actual shard is not a primary " + shardRouting);
|
||||
}
|
||||
final String actualAllocationId = shardRouting.allocationId().getId();
|
||||
if (actualAllocationId.equals(targetAllocationID) == false) {
|
||||
throw new ShardNotFoundException(shardId, "expected allocation id [{}] but found [{}]", targetAllocationID,
|
||||
actualAllocationId);
|
||||
}
|
||||
final long actualTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (actualTerm != primaryTerm) {
|
||||
throw new ShardNotFoundException(shardId, "expected allocation id [{}] with term [{}] but found [{}]", targetAllocationID,
|
||||
primaryTerm, actualTerm);
|
||||
}
|
||||
|
||||
acquirePrimaryOperationPermit(indexShard, request, ActionListener.wrap(
|
||||
releasable -> runWithPrimaryShardReference(new PrimaryShardReference(indexShard, releasable)),
|
||||
this::onFailure
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(PrimaryShardReference primaryShardReference) {
|
||||
void runWithPrimaryShardReference(final PrimaryShardReference primaryShardReference) {
|
||||
try {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
|
||||
@ -660,10 +682,10 @@ public abstract class TransportReplicationAction<
|
||||
setPhase(task, "replica");
|
||||
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
|
||||
if (actualAllocationId.equals(targetAllocationID) == false) {
|
||||
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
|
||||
throw new ShardNotFoundException(this.replica.shardId(), "expected allocation id [{}] but found [{}]", targetAllocationID,
|
||||
actualAllocationId);
|
||||
}
|
||||
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
|
||||
acquireReplicaOperationPermit(replica, request, this, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -697,7 +719,7 @@ public abstract class TransportReplicationAction<
|
||||
}
|
||||
}
|
||||
|
||||
protected IndexShard getIndexShard(ShardId shardId) {
|
||||
protected IndexShard getIndexShard(final ShardId shardId) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
return indexService.getShard(shardId.id());
|
||||
}
|
||||
@ -938,42 +960,26 @@ public abstract class TransportReplicationAction<
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
|
||||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
|
||||
* Executes the logic for acquiring one or more operation permit on a primary shard. The default is to acquire a single permit but this
|
||||
* method can be overridden to acquire more.
|
||||
*/
|
||||
private void acquirePrimaryShardReference(ShardId shardId, String allocationId, long primaryTerm,
|
||||
ActionListener<PrimaryShardReference> onReferenceAcquired, Object debugInfo) {
|
||||
IndexShard indexShard = getIndexShard(shardId);
|
||||
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
||||
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
|
||||
// the replica will take over and a replica will be assigned to the first node.
|
||||
if (indexShard.routingEntry().primary() == false) {
|
||||
throw new ReplicationOperation.RetryOnPrimaryException(indexShard.shardId(),
|
||||
"actual shard is not a primary " + indexShard.routingEntry());
|
||||
}
|
||||
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
|
||||
if (actualAllocationId.equals(allocationId) == false) {
|
||||
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
|
||||
}
|
||||
final long actualTerm = indexShard.getPendingPrimaryTerm();
|
||||
if (actualTerm != primaryTerm) {
|
||||
throw new ShardNotFoundException(shardId, "expected aID [{}] with term [{}] but found [{}]", allocationId,
|
||||
primaryTerm, actualTerm);
|
||||
}
|
||||
protected void acquirePrimaryOperationPermit(final IndexShard primary,
|
||||
final Request request,
|
||||
final ActionListener<Releasable> onAcquired) {
|
||||
primary.acquirePrimaryOperationPermit(onAcquired, executor, request);
|
||||
}
|
||||
|
||||
ActionListener<Releasable> onAcquired = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
onReferenceAcquired.onResponse(new PrimaryShardReference(indexShard, releasable));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
onReferenceAcquired.onFailure(e);
|
||||
}
|
||||
};
|
||||
|
||||
indexShard.acquirePrimaryOperationPermit(onAcquired, executor, debugInfo);
|
||||
/**
|
||||
* Executes the logic for acquiring one or more operation permit on a replica shard. The default is to acquire a single permit but this
|
||||
* method can be overridden to acquire more.
|
||||
*/
|
||||
protected void acquireReplicaOperationPermit(final IndexShard replica,
|
||||
final ReplicaRequest request,
|
||||
final ActionListener<Releasable> onAcquired,
|
||||
final long primaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes) {
|
||||
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
|
||||
}
|
||||
|
||||
class ShardReference implements Releasable {
|
||||
|
@ -2302,7 +2302,18 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
indexShardOperationPermits.acquire(onPermitAcquired, executorOnDelay, false, debugInfo);
|
||||
}
|
||||
|
||||
private <E extends Exception> void bumpPrimaryTerm(long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
|
||||
/**
|
||||
* Acquire all primary operation permits. Once all permits are acquired, the provided ActionListener is called.
|
||||
* It is the responsibility of the caller to close the {@link Releasable}.
|
||||
*/
|
||||
public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable> onPermitAcquired, final TimeValue timeout) {
|
||||
verifyNotClosed();
|
||||
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
|
||||
|
||||
indexShardOperationPermits.asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
|
||||
}
|
||||
|
||||
private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm, final CheckedRunnable<E> onBlocked) {
|
||||
assert Thread.holdsLock(mutex);
|
||||
assert newPrimaryTerm > pendingPrimaryTerm;
|
||||
assert operationPrimaryTerm <= pendingPrimaryTerm;
|
||||
@ -2357,11 +2368,42 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired, final String executorOnDelay,
|
||||
final Object debugInfo) {
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
|
||||
(listener) -> indexShardOperationPermits.acquire(listener, executorOnDelay, true, debugInfo));
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire all replica operation permits whenever the shard is ready for indexing (see
|
||||
* {@link #acquireAllPrimaryOperationsPermits(ActionListener, TimeValue)}. If the given primary term is lower than then one in
|
||||
* {@link #shardRouting}, the {@link ActionListener#onFailure(Exception)} method of the provided listener is invoked with an
|
||||
* {@link IllegalStateException}.
|
||||
*
|
||||
* @param opPrimaryTerm the operation primary term
|
||||
* @param globalCheckpoint the global checkpoint associated with the request
|
||||
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwrite Lucene) or deletes captured on the primary
|
||||
* after this replication request was executed on it (see {@link #getMaxSeqNoOfUpdatesOrDeletes()}
|
||||
* @param onPermitAcquired the listener for permit acquisition
|
||||
* @param timeout the maximum time to wait for the in-flight operations block
|
||||
*/
|
||||
public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired,
|
||||
final TimeValue timeout) {
|
||||
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onPermitAcquired,
|
||||
(listener) -> indexShardOperationPermits.asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit()));
|
||||
}
|
||||
|
||||
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> onPermitAcquired,
|
||||
final Consumer<ActionListener<Releasable>> consumer) {
|
||||
verifyNotClosed();
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
synchronized (mutex) {
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
IndexShardState shardState = state();
|
||||
final IndexShardState shardState = state();
|
||||
// only roll translog and update primary term if shard has made it past recovery
|
||||
// Having a new primary term here means that the old primary failed and that there is a new primary, which again
|
||||
// means that the master will fail this shard as all initializing shards are failed when a primary is selected
|
||||
@ -2373,58 +2415,54 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
||||
|
||||
if (opPrimaryTerm > pendingPrimaryTerm) {
|
||||
bumpPrimaryTerm(opPrimaryTerm, () -> {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
|
||||
final long currentGlobalCheckpoint = getGlobalCheckpoint();
|
||||
final long maxSeqNo = seqNoStats().getMaxSeqNo();
|
||||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
|
||||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
|
||||
if (currentGlobalCheckpoint < maxSeqNo) {
|
||||
resetEngineToGlobalCheckpoint();
|
||||
} else {
|
||||
getEngine().rollTranslogGeneration();
|
||||
}
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
|
||||
final long currentGlobalCheckpoint = getGlobalCheckpoint();
|
||||
final long maxSeqNo = seqNoStats().getMaxSeqNo();
|
||||
logger.info("detected new primary with primary term [{}], global checkpoint [{}], max_seq_no [{}]",
|
||||
opPrimaryTerm, currentGlobalCheckpoint, maxSeqNo);
|
||||
if (currentGlobalCheckpoint < maxSeqNo) {
|
||||
resetEngineToGlobalCheckpoint();
|
||||
} else {
|
||||
getEngine().rollTranslogGeneration();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert opPrimaryTerm <= pendingPrimaryTerm
|
||||
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
|
||||
indexShardOperationPermits.acquire(
|
||||
new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
if (opPrimaryTerm < operationPrimaryTerm) {
|
||||
releasable.close();
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
"%s operation primary term [%d] is too old (current [%d])",
|
||||
shardId,
|
||||
opPrimaryTerm,
|
||||
operationPrimaryTerm);
|
||||
onPermitAcquired.onFailure(new IllegalStateException(message));
|
||||
} else {
|
||||
assert assertReplicationTarget();
|
||||
try {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
|
||||
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
|
||||
} catch (Exception e) {
|
||||
releasable.close();
|
||||
onPermitAcquired.onFailure(e);
|
||||
return;
|
||||
}
|
||||
onPermitAcquired.onResponse(releasable);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
: "operation primary term [" + opPrimaryTerm + "] should be at most [" + pendingPrimaryTerm + "]";
|
||||
consumer.accept(new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
if (opPrimaryTerm < operationPrimaryTerm) {
|
||||
releasable.close();
|
||||
final String message = String.format(
|
||||
Locale.ROOT,
|
||||
"%s operation primary term [%d] is too old (current [%d])",
|
||||
shardId,
|
||||
opPrimaryTerm,
|
||||
operationPrimaryTerm);
|
||||
onPermitAcquired.onFailure(new IllegalStateException(message));
|
||||
} else {
|
||||
assert assertReplicationTarget();
|
||||
try {
|
||||
updateGlobalCheckpointOnReplica(globalCheckpoint, "operation");
|
||||
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
|
||||
} catch (Exception e) {
|
||||
releasable.close();
|
||||
onPermitAcquired.onFailure(e);
|
||||
return;
|
||||
}
|
||||
},
|
||||
executorOnDelay,
|
||||
true, debugInfo);
|
||||
onPermitAcquired.onResponse(releasable);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
onPermitAcquired.onFailure(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public int getActiveOperationsCount() {
|
||||
|
@ -949,11 +949,11 @@ public class TransportReplicationActionTests extends ESTestCase {
|
||||
logger.debug("got exception:" , throwable);
|
||||
assertTrue(throwable.getClass() + " is not a retry exception", action.retryPrimaryException(throwable));
|
||||
if (wrongAllocationId) {
|
||||
assertThat(throwable.getMessage(), containsString("expected aID [_not_a_valid_aid_] but found [" +
|
||||
assertThat(throwable.getMessage(), containsString("expected allocation id [_not_a_valid_aid_] but found [" +
|
||||
primary.allocationId().getId() + "]"));
|
||||
} else {
|
||||
assertThat(throwable.getMessage(), containsString("expected aID [" + primary.allocationId().getId() + "] with term [" +
|
||||
requestTerm + "] but found [" + primaryTerm + "]"));
|
||||
assertThat(throwable.getMessage(), containsString("expected allocation id [" + primary.allocationId().getId()
|
||||
+ "] with term [" + requestTerm + "] but found [" + primaryTerm + "]"));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
561
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java
Normal file
561
server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java
Normal file
@ -0,0 +1,561 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.support.replication;
|
||||
|
||||
import org.apache.lucene.util.SetOnce;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RecoverySource;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.engine.InternalEngineFactory;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardTestCase;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
import org.elasticsearch.transport.TransportResponse;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_INDEX_UUID;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
|
||||
/**
|
||||
* This test tests the concurrent execution of several transport replication actions. All of these actions (except one) acquire a single
|
||||
* permit during their execution on shards and are expected to fail if a global level or index level block is present in the cluster state.
|
||||
* These actions are all started at the same time, but some are delayed until one last action.
|
||||
*
|
||||
* This last action is special because it acquires all the permits on shards, adds the block to the cluster state and then "releases" the
|
||||
* previously delayed single permit actions. This way, there is a clear transition between the single permit actions executed before the
|
||||
* all permit action that sets the block and those executed afterwards that are doomed to fail because of the block.
|
||||
*/
|
||||
public class TransportReplicationAllPermitsAcquisitionTests extends IndexShardTestCase {
|
||||
|
||||
private ClusterService clusterService;
|
||||
private TransportService transportService;
|
||||
private ShardStateAction shardStateAction;
|
||||
private ShardId shardId;
|
||||
private IndexShard primary;
|
||||
private IndexShard replica;
|
||||
private boolean globalBlock;
|
||||
private ClusterBlock block;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
globalBlock = randomBoolean();
|
||||
RestStatus restStatus = randomFrom(RestStatus.values());
|
||||
block = new ClusterBlock(randomIntBetween(1, 10), randomAlphaOfLength(5), false, true, false, restStatus, ClusterBlockLevel.ALL);
|
||||
clusterService = createClusterService(threadPool);
|
||||
|
||||
final ClusterState.Builder state = ClusterState.builder(clusterService.state());
|
||||
Set<DiscoveryNode.Role> roles = new HashSet<>(Arrays.asList(DiscoveryNode.Role.values()));
|
||||
DiscoveryNode node1 = new DiscoveryNode("_name1", "_node1", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT);
|
||||
DiscoveryNode node2 = new DiscoveryNode("_name2", "_node2", buildNewFakeTransportAddress(), emptyMap(), roles, Version.CURRENT);
|
||||
state.nodes(DiscoveryNodes.builder()
|
||||
.add(node1)
|
||||
.add(node2)
|
||||
.localNodeId(node1.getId())
|
||||
.masterNodeId(node1.getId()));
|
||||
|
||||
shardId = new ShardId("index", UUID.randomUUID().toString(), 0);
|
||||
ShardRouting shardRouting =
|
||||
newShardRouting(shardId, node1.getId(), true, ShardRoutingState.INITIALIZING, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
|
||||
|
||||
Settings indexSettings = Settings.builder()
|
||||
.put(SETTING_VERSION_CREATED, Version.CURRENT)
|
||||
.put(SETTING_INDEX_UUID, shardId.getIndex().getUUID())
|
||||
.put(SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
.put(SETTING_CREATION_DATE, System.currentTimeMillis())
|
||||
.build();
|
||||
|
||||
primary = newStartedShard(p -> newShard(shardRouting, indexSettings, new InternalEngineFactory()), true);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
final String id = Integer.toString(i);
|
||||
indexDoc(primary, "_doc", id, "{\"value\":" + id + "}");
|
||||
}
|
||||
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(shardId.getIndexName())
|
||||
.settings(indexSettings)
|
||||
.primaryTerm(shardId.id(), primary.getOperationPrimaryTerm())
|
||||
.putMapping("_doc","{ \"properties\": { \"value\": { \"type\": \"short\"}}}")
|
||||
.build();
|
||||
state.metaData(MetaData.builder().put(indexMetaData, false).generateClusterUuidIfNeeded());
|
||||
|
||||
replica = newShard(primary.shardId(), false, node2.getId(), indexMetaData, null);
|
||||
recoverReplica(replica, primary, true);
|
||||
|
||||
IndexRoutingTable.Builder routing = IndexRoutingTable.builder(indexMetaData.getIndex());
|
||||
routing.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
|
||||
.addShard(primary.routingEntry())
|
||||
.build());
|
||||
state.routingTable(RoutingTable.builder().add(routing.build()).build());
|
||||
|
||||
setState(clusterService, state.build());
|
||||
|
||||
final Settings transportSettings = Settings.builder().put("node.name", node1.getId()).build();
|
||||
transportService = MockTransportService.createNewService(transportSettings, Version.CURRENT, threadPool, null);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
|
||||
}
|
||||
|
||||
@Override
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
closeShards(primary, replica);
|
||||
transportService.stop();
|
||||
clusterService.close();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
public void testTransportReplicationActionWithAllPermits() throws Exception {
|
||||
final int numOperations = scaledRandomIntBetween(4, 32);
|
||||
final int delayedOperations = randomIntBetween(1, numOperations);
|
||||
logger.trace("starting [{}] operations, among which the first [{}] started ops should be blocked by [{}]",
|
||||
numOperations, delayedOperations, block);
|
||||
|
||||
final CyclicBarrier delayedOperationsBarrier = new CyclicBarrier(delayedOperations + 1);
|
||||
final List<Thread> threads = new ArrayList<>(delayedOperationsBarrier.getParties());
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final PlainActionFuture<Response>[] futures = new PlainActionFuture[numOperations];
|
||||
final TestAction[] actions = new TestAction[numOperations];
|
||||
|
||||
for (int i = 0; i < numOperations; i++) {
|
||||
final int threadId = i;
|
||||
final boolean delayed = (threadId < delayedOperations);
|
||||
|
||||
final PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
futures[threadId] = listener;
|
||||
|
||||
final TestAction singlePermitAction = new SinglePermitWithBlocksAction(Settings.EMPTY, "internalSinglePermit[" + threadId + "]",
|
||||
transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica, globalBlock);
|
||||
actions[threadId] = singlePermitAction;
|
||||
|
||||
Thread thread = new Thread(() -> {
|
||||
TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction =
|
||||
singlePermitAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(listener), null) {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
if (delayed) {
|
||||
logger.trace("op [{}] has started and will resume execution once allPermitsAction is terminated", threadId);
|
||||
delayedOperationsBarrier.await();
|
||||
}
|
||||
super.doRun();
|
||||
}
|
||||
|
||||
@Override
|
||||
void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) {
|
||||
assertThat(reference.indexShard.getActiveOperationsCount(), greaterThan(0));
|
||||
assertSame(primary, reference.indexShard);
|
||||
assertBlockIsPresentForDelayedOp();
|
||||
super.runWithPrimaryShardReference(reference);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
assertBlockIsPresentForDelayedOp();
|
||||
super.onFailure(e);
|
||||
}
|
||||
|
||||
private void assertBlockIsPresentForDelayedOp() {
|
||||
if (delayed) {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
if (globalBlock) {
|
||||
assertTrue("Global block must exist", clusterState.blocks().hasGlobalBlock(block));
|
||||
} else {
|
||||
String indexName = primary.shardId().getIndexName();
|
||||
assertTrue("Index block must exist", clusterState.blocks().hasIndexBlock(indexName, block));
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
asyncPrimaryAction.run();
|
||||
});
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
}
|
||||
|
||||
logger.trace("now starting the operation that acquires all permits and sets the block in the cluster state");
|
||||
|
||||
// An action which acquires all operation permits during execution and set a block
|
||||
final TestAction allPermitsAction = new AllPermitsThenBlockAction(Settings.EMPTY, "internalAllPermits", transportService,
|
||||
clusterService, shardStateAction, threadPool, shardId, primary, replica);
|
||||
|
||||
final PlainActionFuture<Response> allPermitFuture = new PlainActionFuture<>();
|
||||
Thread thread = new Thread(() -> {
|
||||
TransportReplicationAction.AsyncPrimaryAction asyncPrimaryAction =
|
||||
allPermitsAction.new AsyncPrimaryAction(request(), allocationId(), primaryTerm(), transportChannel(allPermitFuture), null) {
|
||||
@Override
|
||||
void runWithPrimaryShardReference(final TransportReplicationAction.PrimaryShardReference reference) {
|
||||
assertEquals("All permits must be acquired", 0, reference.indexShard.getActiveOperationsCount());
|
||||
assertSame(primary, reference.indexShard);
|
||||
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder();
|
||||
if (globalBlock) {
|
||||
assertFalse("Global block must not exist yet", clusterState.blocks().hasGlobalBlock(block));
|
||||
blocks.addGlobalBlock(block);
|
||||
} else {
|
||||
String indexName = reference.indexShard.shardId().getIndexName();
|
||||
assertFalse("Index block must not exist yet", clusterState.blocks().hasIndexBlock(indexName, block));
|
||||
blocks.addIndexBlock(indexName, block);
|
||||
}
|
||||
|
||||
logger.trace("adding test block to cluster state {}", block);
|
||||
setState(clusterService, ClusterState.builder(clusterState).blocks(blocks));
|
||||
|
||||
try {
|
||||
logger.trace("releasing delayed operations");
|
||||
delayedOperationsBarrier.await();
|
||||
} catch (InterruptedException | BrokenBarrierException e) {
|
||||
onFailure(e);
|
||||
}
|
||||
super.runWithPrimaryShardReference(reference);
|
||||
}
|
||||
};
|
||||
asyncPrimaryAction.run();
|
||||
});
|
||||
threads.add(thread);
|
||||
thread.start();
|
||||
|
||||
logger.trace("waiting for all operations to terminate");
|
||||
for (Thread t : threads) {
|
||||
t.join();
|
||||
}
|
||||
|
||||
final Response allPermitsResponse = allPermitFuture.get();
|
||||
assertSuccessfulOperation(allPermitsAction, allPermitsResponse);
|
||||
|
||||
for (int i = 0; i < numOperations; i++) {
|
||||
final PlainActionFuture<Response> future = futures[i];
|
||||
final TestAction action = actions[i];
|
||||
|
||||
if (i < delayedOperations) {
|
||||
ExecutionException exception = expectThrows(ExecutionException.class, "delayed operation should have failed", future::get);
|
||||
assertFailedOperation(action, exception);
|
||||
} else {
|
||||
// non delayed operation might fail depending on the order they were executed
|
||||
try {
|
||||
assertSuccessfulOperation(action, futures[i].get());
|
||||
} catch (final ExecutionException e) {
|
||||
assertFailedOperation(action, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void assertSuccessfulOperation(final TestAction action, final Response response) {
|
||||
final String name = action.getActionName();
|
||||
assertThat(name + " operation should have been executed on primary", action.executedOnPrimary.get(), is(true));
|
||||
assertThat(name + " operation should have been executed on replica", action.executedOnReplica.get(), is(true));
|
||||
assertThat(name + " operation must have a non null result", response, notNullValue());
|
||||
assertThat(name + " operation should have been successful on 2 shards", response.getShardInfo().getSuccessful(), equalTo(2));
|
||||
}
|
||||
|
||||
private void assertFailedOperation(final TestAction action,final ExecutionException exception) {
|
||||
final String name = action.getActionName();
|
||||
assertThat(name + " operation should not have been executed on primary", action.executedOnPrimary.get(), nullValue());
|
||||
assertThat(name + " operation should not have been executed on replica", action.executedOnReplica.get(), nullValue());
|
||||
assertThat(exception.getCause(), instanceOf(ClusterBlockException.class));
|
||||
ClusterBlockException clusterBlockException = (ClusterBlockException) exception.getCause();
|
||||
assertThat(clusterBlockException.blocks(), hasItem(equalTo(block)));
|
||||
}
|
||||
|
||||
private long primaryTerm() {
|
||||
return primary.getOperationPrimaryTerm();
|
||||
}
|
||||
|
||||
private String allocationId() {
|
||||
return primary.routingEntry().allocationId().getId();
|
||||
}
|
||||
|
||||
private Request request() {
|
||||
return new Request().setShardId(primary.shardId());
|
||||
}
|
||||
|
||||
/**
|
||||
* A type of {@link TransportReplicationAction} that allows to use the primary and replica shards passed to the constructor for the
|
||||
* execution of the replication action. Also records if the operation is executed on the primary and the replica.
|
||||
*/
|
||||
private abstract class TestAction extends TransportReplicationAction<Request, Request, Response> {
|
||||
|
||||
protected final ShardId shardId;
|
||||
protected final IndexShard primary;
|
||||
protected final IndexShard replica;
|
||||
protected final SetOnce<Boolean> executedOnPrimary = new SetOnce<>();
|
||||
protected final SetOnce<Boolean> executedOnReplica = new SetOnce<>();
|
||||
|
||||
TestAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
|
||||
ShardStateAction shardStateAction, ThreadPool threadPool, ShardId shardId, IndexShard primary, IndexShard replica) {
|
||||
super(settings, actionName, transportService, clusterService, null, threadPool, shardStateAction,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(), Request::new, Request::new, ThreadPool.Names.SAME);
|
||||
this.shardId = Objects.requireNonNull(shardId);
|
||||
this.primary = Objects.requireNonNull(primary);
|
||||
assertEquals(shardId, primary.shardId());
|
||||
this.replica = Objects.requireNonNull(replica);
|
||||
assertEquals(shardId, replica.shardId());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponseInstance() {
|
||||
return new Response();
|
||||
}
|
||||
|
||||
public String getActionName() {
|
||||
return this.actionName;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult<Request, Response> shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception {
|
||||
executedOnPrimary.set(true);
|
||||
// The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here
|
||||
// that the permit has been acquired on the primary shard
|
||||
assertSame(primary, shard);
|
||||
return new PrimaryResult<>(shardRequest, new Response());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
|
||||
executedOnReplica.set(true);
|
||||
// The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here
|
||||
// that the permit has been acquired on the replica shard
|
||||
assertSame(replica, shard);
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IndexShard getIndexShard(final ShardId shardId) {
|
||||
if (this.shardId.equals(shardId) == false) {
|
||||
throw new AssertionError("shard id differs from " + shardId);
|
||||
}
|
||||
return (executedOnPrimary.get() == null) ? primary : replica;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sendReplicaRequest(final ConcreteReplicaRequest<Request> replicaRequest,
|
||||
final DiscoveryNode node,
|
||||
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
|
||||
assertEquals("Replica is always assigned to node 2 in this test", clusterService.state().nodes().get("_node2"), node);
|
||||
ReplicaOperationTransportHandler replicaOperationTransportHandler = new ReplicaOperationTransportHandler();
|
||||
try {
|
||||
replicaOperationTransportHandler.messageReceived(replicaRequest, new TransportChannel() {
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
listener.onResponse((ReplicationOperation.ReplicaResponse) response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Exception exception) throws IOException {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}, null);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A type of {@link TransportReplicationAction} that acquires a single permit during execution and that blocks
|
||||
* on {@link ClusterBlockLevel#WRITE}. The block can be a global level or an index level block depending of the
|
||||
* value of the {@code globalBlock} parameter in the constructor. When the operation is executed on shards it
|
||||
* verifies that at least 1 permit is acquired and that there is no blocks in the cluster state.
|
||||
*/
|
||||
private class SinglePermitWithBlocksAction extends TestAction {
|
||||
|
||||
private final boolean globalBlock;
|
||||
|
||||
SinglePermitWithBlocksAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
|
||||
ShardStateAction shardStateAction, ThreadPool threadPool,
|
||||
ShardId shardId, IndexShard primary, IndexShard replica, boolean globalBlock) {
|
||||
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica);
|
||||
this.globalBlock = globalBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel globalBlockLevel() {
|
||||
return globalBlock ? ClusterBlockLevel.WRITE : super.globalBlockLevel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockLevel indexBlockLevel() {
|
||||
return globalBlock == false ? ClusterBlockLevel.WRITE : super.indexBlockLevel();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult<Request, Response> shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception {
|
||||
assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before");
|
||||
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
|
||||
return super.shardOperationOnPrimary(shardRequest, shard);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
|
||||
assertNoBlocks("block must not exist when executing the operation on replica shard: it should have been blocked before");
|
||||
assertThat(shard.getActiveOperationsCount(), greaterThan(0));
|
||||
return super.shardOperationOnReplica(shardRequest, shard);
|
||||
}
|
||||
|
||||
private void assertNoBlocks(final String error) {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
assertFalse("Global level " + error, clusterState.blocks().hasGlobalBlock(block));
|
||||
assertFalse("Index level " + error, clusterState.blocks().hasIndexBlock(shardId.getIndexName(), block));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A type of {@link TransportReplicationAction} that acquires all permits during execution.
|
||||
*/
|
||||
private class AllPermitsThenBlockAction extends TestAction {
|
||||
|
||||
private final TimeValue timeout = TimeValue.timeValueSeconds(30L);
|
||||
|
||||
AllPermitsThenBlockAction(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
|
||||
ShardStateAction shardStateAction, ThreadPool threadPool,
|
||||
ShardId shardId, IndexShard primary, IndexShard replica) {
|
||||
super(settings, actionName, transportService, clusterService, shardStateAction, threadPool, shardId, primary, replica);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquirePrimaryOperationPermit(IndexShard shard, Request request, ActionListener<Releasable> onAcquired) {
|
||||
shard.acquireAllPrimaryOperationsPermits(onAcquired, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquireReplicaOperationPermit(IndexShard shard, Request request, ActionListener<Releasable> onAcquired,
|
||||
long primaryTerm, long globalCheckpoint, long maxSeqNo) {
|
||||
shard.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNo, onAcquired, timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult<Request, Response> shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception {
|
||||
assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount());
|
||||
return super.shardOperationOnPrimary(shardRequest, shard);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request shardRequest, IndexShard shard) throws Exception {
|
||||
assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount());
|
||||
return super.shardOperationOnReplica(shardRequest, shard);
|
||||
}
|
||||
}
|
||||
|
||||
static class Request extends ReplicationRequest<Request> {
|
||||
@Override
|
||||
public String toString() {
|
||||
return getTestClass().getName() + ".Request";
|
||||
}
|
||||
}
|
||||
|
||||
static class Response extends ReplicationResponse {
|
||||
}
|
||||
|
||||
/**
|
||||
* Transport channel that is needed for replica operation testing.
|
||||
*/
|
||||
public TransportChannel transportChannel(final PlainActionFuture<Response> listener) {
|
||||
return new TransportChannel() {
|
||||
|
||||
@Override
|
||||
public String getProfileName() {
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response) throws IOException {
|
||||
listener.onResponse(((Response) response));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponse(Exception exception) throws IOException {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getChannelType() {
|
||||
return "replica_test";
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -34,6 +34,7 @@ import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.Constants;
|
||||
import org.elasticsearch.Assertions;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
@ -60,6 +61,7 @@ import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -69,6 +71,7 @@ import org.elasticsearch.common.settings.IndexScopedSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
@ -124,7 +127,6 @@ import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.FieldMaskingReader;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
@ -304,30 +306,27 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
}
|
||||
|
||||
public void testClosesPreventsNewOperations() throws InterruptedException, ExecutionException, IOException {
|
||||
public void testClosesPreventsNewOperations() throws Exception {
|
||||
IndexShard indexShard = newStartedShard();
|
||||
closeShards(indexShard);
|
||||
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
||||
try {
|
||||
indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, "");
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
}
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
randomNonNegativeLong(), null, ThreadPool.Names.WRITE, "");
|
||||
fail("we should not be able to increment anymore");
|
||||
} catch (IndexShardClosedException e) {
|
||||
// expected
|
||||
}
|
||||
expectThrows(IndexShardClosedException.class,
|
||||
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
|
||||
expectThrows(IndexShardClosedException.class,
|
||||
() -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L)));
|
||||
expectThrows(IndexShardClosedException.class,
|
||||
() -> indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""));
|
||||
expectThrows(IndexShardClosedException.class,
|
||||
() -> indexShard.acquireAllReplicaOperationsPermits(indexShard.getPendingPrimaryTerm(), SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
randomNonNegativeLong(), null, TimeValue.timeValueSeconds(30L)));
|
||||
}
|
||||
|
||||
public void testRejectOperationPermitWithHigherTermWhenNotStarted() throws IOException {
|
||||
IndexShard indexShard = newShard(false);
|
||||
expectThrows(IndexShardNotStartedException.class, () ->
|
||||
indexShard.acquireReplicaOperationPermit(indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ThreadPool.Names.WRITE, ""));
|
||||
randomReplicaOperationPermitAcquisition(indexShard, indexShard.getPendingPrimaryTerm() + randomIntBetween(1, 100),
|
||||
SequenceNumbers.UNASSIGNED_SEQ_NO, randomNonNegativeLong(), null, ""));
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
@ -620,6 +619,106 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
public void testAcquirePrimaryAllOperationsPermits() throws Exception {
|
||||
final IndexShard indexShard = newStartedShard(true);
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
|
||||
final CountDownLatch allPermitsAcquired = new CountDownLatch(1);
|
||||
|
||||
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
|
||||
final List<PlainActionFuture<Releasable>> futures = new ArrayList<>(threads.length);
|
||||
final AtomicArray<Tuple<Boolean, Exception>> results = new AtomicArray<>(threads.length);
|
||||
final CountDownLatch allOperationsDone = new CountDownLatch(threads.length);
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
final int threadId = i;
|
||||
final boolean singlePermit = randomBoolean();
|
||||
|
||||
final PlainActionFuture<Releasable> future = new PlainActionFuture<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
if (singlePermit) {
|
||||
assertThat(indexShard.getActiveOperationsCount(), greaterThan(0));
|
||||
} else {
|
||||
assertThat(indexShard.getActiveOperationsCount(), equalTo(0));
|
||||
}
|
||||
releasable.close();
|
||||
super.onResponse(releasable);
|
||||
results.setOnce(threadId, Tuple.tuple(Boolean.TRUE, null));
|
||||
allOperationsDone.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final Exception e) {
|
||||
results.setOnce(threadId, Tuple.tuple(Boolean.FALSE, e));
|
||||
allOperationsDone.countDown();
|
||||
}
|
||||
};
|
||||
futures.add(threadId, future);
|
||||
|
||||
threads[threadId] = new Thread(() -> {
|
||||
try {
|
||||
allPermitsAcquired.await();
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (singlePermit) {
|
||||
indexShard.acquirePrimaryOperationPermit(future, ThreadPool.Names.WRITE, "");
|
||||
} else {
|
||||
indexShard.acquireAllPrimaryOperationsPermits(future, TimeValue.timeValueHours(1L));
|
||||
}
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
});
|
||||
threads[threadId].start();
|
||||
}
|
||||
|
||||
final AtomicBoolean blocked = new AtomicBoolean();
|
||||
final CountDownLatch allPermitsTerminated = new CountDownLatch(1);
|
||||
|
||||
final PlainActionFuture<Releasable> futureAllPermits = new PlainActionFuture<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(final Releasable releasable) {
|
||||
try {
|
||||
blocked.set(true);
|
||||
allPermitsAcquired.countDown();
|
||||
super.onResponse(releasable);
|
||||
allPermitsTerminated.await();
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
indexShard.acquireAllPrimaryOperationsPermits(futureAllPermits, TimeValue.timeValueSeconds(30L));
|
||||
allPermitsAcquired.await();
|
||||
assertTrue(blocked.get());
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
assertTrue("Expected no results, operations are blocked", results.asList().isEmpty());
|
||||
futures.forEach(future -> assertFalse(future.isDone()));
|
||||
|
||||
allPermitsTerminated.countDown();
|
||||
|
||||
final Releasable allPermits = futureAllPermits.get();
|
||||
assertTrue(futureAllPermits.isDone());
|
||||
|
||||
assertTrue("Expected no results, operations are blocked", results.asList().isEmpty());
|
||||
futures.forEach(future -> assertFalse(future.isDone()));
|
||||
|
||||
Releasables.close(allPermits);
|
||||
allOperationsDone.await();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
futures.forEach(future -> assertTrue(future.isDone()));
|
||||
assertEquals(threads.length, results.asList().size());
|
||||
results.asList().forEach(result -> {
|
||||
assertTrue(result.v1());
|
||||
assertNull(result.v2());
|
||||
});
|
||||
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
private Releasable acquirePrimaryOperationPermitBlockingly(IndexShard indexShard) throws ExecutionException, InterruptedException {
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
indexShard.acquirePrimaryOperationPermit(fut, ThreadPool.Names.WRITE, "");
|
||||
@ -676,10 +775,14 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
if (shardRouting.primary() == false && Assertions.ENABLED) {
|
||||
final AssertionError e =
|
||||
AssertionError e =
|
||||
expectThrows(AssertionError.class,
|
||||
() -> indexShard.acquirePrimaryOperationPermit(null, ThreadPool.Names.WRITE, ""));
|
||||
assertThat(e, hasToString(containsString("acquirePrimaryOperationPermit should only be called on primary shard")));
|
||||
|
||||
e = expectThrows(AssertionError.class,
|
||||
() -> indexShard.acquireAllPrimaryOperationsPermits(null, TimeValue.timeValueSeconds(30L)));
|
||||
assertThat(e, hasToString(containsString("acquireAllPrimaryOperationsPermits should only be called on primary shard")));
|
||||
}
|
||||
|
||||
final long primaryTerm = indexShard.getPendingPrimaryTerm();
|
||||
@ -697,34 +800,6 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
operation2 = null;
|
||||
}
|
||||
|
||||
{
|
||||
final AtomicBoolean onResponse = new AtomicBoolean();
|
||||
final AtomicBoolean onFailure = new AtomicBoolean();
|
||||
final AtomicReference<Exception> onFailureException = new AtomicReference<>();
|
||||
ActionListener<Releasable> onLockAcquired = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
onResponse.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
onFailure.set(true);
|
||||
onFailureException.set(e);
|
||||
}
|
||||
};
|
||||
|
||||
indexShard.acquireReplicaOperationPermit(primaryTerm - 1, SequenceNumbers.UNASSIGNED_SEQ_NO,
|
||||
randomNonNegativeLong(), onLockAcquired, ThreadPool.Names.WRITE, "");
|
||||
|
||||
assertFalse(onResponse.get());
|
||||
assertTrue(onFailure.get());
|
||||
assertThat(onFailureException.get(), instanceOf(IllegalStateException.class));
|
||||
assertThat(
|
||||
onFailureException.get(),
|
||||
hasToString(containsString("operation primary term [" + (primaryTerm - 1) + "] is too old")));
|
||||
}
|
||||
|
||||
{
|
||||
final AtomicBoolean onResponse = new AtomicBoolean();
|
||||
final AtomicReference<Exception> onFailure = new AtomicReference<>();
|
||||
@ -785,12 +860,12 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
}
|
||||
};
|
||||
try {
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
randomReplicaOperationPermitAcquisition(indexShard,
|
||||
newPrimaryTerm,
|
||||
newGlobalCheckPoint,
|
||||
randomNonNegativeLong(),
|
||||
listener,
|
||||
ThreadPool.Names.SAME, "");
|
||||
"");
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
@ -837,6 +912,37 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
assertEquals(0, indexShard.getActiveOperationsCount());
|
||||
}
|
||||
|
||||
{
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicBoolean onResponse = new AtomicBoolean();
|
||||
final AtomicBoolean onFailure = new AtomicBoolean();
|
||||
final AtomicReference<Exception> onFailureException = new AtomicReference<>();
|
||||
ActionListener<Releasable> onLockAcquired = new ActionListener<Releasable>() {
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
onResponse.set(true);
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
onFailure.set(true);
|
||||
onFailureException.set(e);
|
||||
latch.countDown();
|
||||
}
|
||||
};
|
||||
|
||||
final long oldPrimaryTerm = indexShard.pendingPrimaryTerm - 1;
|
||||
randomReplicaOperationPermitAcquisition(indexShard, oldPrimaryTerm, indexShard.getGlobalCheckpoint(),
|
||||
randomNonNegativeLong(), onLockAcquired, "");
|
||||
latch.await();
|
||||
assertFalse(onResponse.get());
|
||||
assertTrue(onFailure.get());
|
||||
assertThat(onFailureException.get(), instanceOf(IllegalStateException.class));
|
||||
assertThat(
|
||||
onFailureException.get(), hasToString(containsString("operation primary term [" + oldPrimaryTerm + "] is too old")));
|
||||
}
|
||||
|
||||
closeShards(indexShard);
|
||||
}
|
||||
|
||||
@ -848,8 +954,8 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
|
||||
long newMaxSeqNoOfUpdates = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, Long.MAX_VALUE);
|
||||
PlainActionFuture<Releasable> fut = new PlainActionFuture<>();
|
||||
replica.acquireReplicaOperationPermit(replica.operationPrimaryTerm, replica.getGlobalCheckpoint(),
|
||||
newMaxSeqNoOfUpdates, fut, ThreadPool.Names.WRITE, "");
|
||||
randomReplicaOperationPermitAcquisition(replica, replica.operationPrimaryTerm, replica.getGlobalCheckpoint(),
|
||||
newMaxSeqNoOfUpdates, fut, "");
|
||||
try (Releasable ignored = fut.actionGet()) {
|
||||
assertThat(replica.getMaxSeqNoOfUpdatesOrDeletes(), equalTo(Math.max(currentMaxSeqNoOfUpdates, newMaxSeqNoOfUpdates)));
|
||||
}
|
||||
@ -932,7 +1038,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
final Set<String> docsBeforeRollback = getShardDocUIDs(indexShard);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final boolean shouldRollback = Math.max(globalCheckpointOnReplica, globalCheckpoint) < maxSeqNo;
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
randomReplicaOperationPermitAcquisition(indexShard,
|
||||
indexShard.getPendingPrimaryTerm() + 1,
|
||||
globalCheckpoint,
|
||||
maxSeqNoOfUpdatesOrDeletes,
|
||||
@ -947,8 +1053,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
public void onFailure(Exception e) {
|
||||
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.SAME, "");
|
||||
}, "");
|
||||
|
||||
latch.await();
|
||||
if (shouldRollback) {
|
||||
@ -999,7 +1104,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
&& indexShard.seqNoStats().getMaxSeqNo() != SequenceNumbers.NO_OPS_PERFORMED;
|
||||
final Engine beforeRollbackEngine = indexShard.getEngine();
|
||||
final long newMaxSeqNoOfUpdates = randomLongBetween(indexShard.getMaxSeqNoOfUpdatesOrDeletes(), Long.MAX_VALUE);
|
||||
indexShard.acquireReplicaOperationPermit(
|
||||
randomReplicaOperationPermitAcquisition(indexShard,
|
||||
indexShard.pendingPrimaryTerm + 1,
|
||||
globalCheckpoint,
|
||||
newMaxSeqNoOfUpdates,
|
||||
@ -1014,8 +1119,7 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
public void onFailure(final Exception e) {
|
||||
|
||||
}
|
||||
},
|
||||
ThreadPool.Names.SAME, "");
|
||||
}, "");
|
||||
|
||||
latch.await();
|
||||
if (globalCheckpointOnReplica == SequenceNumbers.UNASSIGNED_SEQ_NO && globalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
|
||||
@ -3497,4 +3601,23 @@ public class IndexShardTests extends IndexShardTestCase {
|
||||
public Settings threadPoolSettings() {
|
||||
return Settings.builder().put(super.threadPoolSettings()).put("thread_pool.estimated_time_interval", "5ms").build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Randomizes the usage of {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)} and
|
||||
* {@link IndexShard#acquireAllReplicaOperationsPermits(long, long, long, ActionListener, TimeValue)} in order to acquire a permit.
|
||||
*/
|
||||
private void randomReplicaOperationPermitAcquisition(final IndexShard indexShard,
|
||||
final long opPrimaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdatesOrDeletes,
|
||||
final ActionListener<Releasable> listener,
|
||||
final String info) {
|
||||
if (randomBoolean()) {
|
||||
final String executor = ThreadPool.Names.WRITE;
|
||||
indexShard.acquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, executor, info);
|
||||
} else {
|
||||
final TimeValue timeout = TimeValue.timeValueSeconds(30L);
|
||||
indexShard.acquireAllReplicaOperationsPermits(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, listener, timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user