TransportReplicationAction subclasses shouldn't have to resolve shards (#20730)
TRA currently resolves incoming requests to IndexShards in order to acquire operations locks on them. There is no need for all subclasses to have to go through the same IndicesService/IndexService song and dance. Also, doing it once means we don't need to worry about edge cases where the shard is removed while a TRA is in flight.
This commit is contained in:
parent
134b1f9b4d
commit
a2e82adc6f
|
@ -54,18 +54,16 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
|
|||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest) {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
|
||||
indexShard.flush(shardRequest.getRequest());
|
||||
logger.trace("{} flush request executed on primary", indexShard.shardId());
|
||||
protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary) {
|
||||
primary.flush(shardRequest.getRequest());
|
||||
logger.trace("{} flush request executed on primary", primary.shardId());
|
||||
return new PrimaryResult(shardRequest, new ReplicationResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request) {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
|
||||
indexShard.flush(request.getRequest());
|
||||
logger.trace("{} flush request executed on replica", indexShard.shardId());
|
||||
protected ReplicaResult shardOperationOnReplica(ShardFlushRequest request, IndexShard replica) {
|
||||
replica.flush(request.getRequest());
|
||||
logger.trace("{} flush request executed on replica", replica.shardId());
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
@ -55,19 +54,16 @@ public class TransportShardRefreshAction
|
|||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest) {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
|
||||
indexShard.refresh("api");
|
||||
logger.trace("{} refresh request executed on primary", indexShard.shardId());
|
||||
protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) {
|
||||
primary.refresh("api");
|
||||
logger.trace("{} refresh request executed on primary", primary.shardId());
|
||||
return new PrimaryResult(shardRequest, new ReplicationResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request) {
|
||||
final ShardId shardId = request.shardId();
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShard(shardId.id());
|
||||
indexShard.refresh("api");
|
||||
logger.trace("{} refresh request executed on replica", indexShard.shardId());
|
||||
protected ReplicaResult shardOperationOnReplica(BasicReplicationRequest request, IndexShard replica) {
|
||||
replica.refresh("api");
|
||||
logger.trace("{} refresh request executed on replica", replica.shardId());
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
|
@ -110,17 +109,15 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
|
|||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard indexShard) throws Exception {
|
||||
ShardId shardId = request.shardId();
|
||||
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
final IndexMetaData metaData = indexService.getIndexSettings().getIndexMetaData();
|
||||
protected WriteResult<BulkShardResponse> onPrimaryShard(BulkShardRequest request, IndexShard primary) throws Exception {
|
||||
final IndexMetaData metaData = primary.indexSettings().getIndexMetaData();
|
||||
|
||||
long[] preVersions = new long[request.items().length];
|
||||
VersionType[] preVersionTypes = new VersionType[request.items().length];
|
||||
Translog.Location location = null;
|
||||
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
|
||||
BulkItemRequest item = request.items()[requestIndex];
|
||||
location = handleItem(metaData, request, indexShard, preVersions, preVersionTypes, location, requestIndex, item);
|
||||
location = handleItem(metaData, request, primary, preVersions, preVersionTypes, location, requestIndex, item);
|
||||
}
|
||||
|
||||
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
|
||||
|
|
|
@ -93,16 +93,16 @@ public abstract class TransportReplicationAction<
|
|||
Response extends ReplicationResponse
|
||||
> extends TransportAction<Request, Response> {
|
||||
|
||||
protected final TransportService transportService;
|
||||
private final TransportService transportService;
|
||||
protected final ClusterService clusterService;
|
||||
protected final IndicesService indicesService;
|
||||
private final IndicesService indicesService;
|
||||
private final ShardStateAction shardStateAction;
|
||||
private final TransportRequestOptions transportOptions;
|
||||
private final String executor;
|
||||
|
||||
// package private for testing
|
||||
final String transportReplicaAction;
|
||||
final String transportPrimaryAction;
|
||||
private final String transportReplicaAction;
|
||||
private final String transportPrimaryAction;
|
||||
private final ReplicasProxy replicasProxy;
|
||||
|
||||
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
|
||||
|
@ -167,14 +167,18 @@ public abstract class TransportReplicationAction<
|
|||
* Primary operation on node with primary copy.
|
||||
*
|
||||
* @param shardRequest the request to the primary shard
|
||||
* @param primary the primary shard to perform the operation on
|
||||
*/
|
||||
protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest) throws Exception;
|
||||
protected abstract PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception;
|
||||
|
||||
/**
|
||||
* Synchronous replica operation on nodes with replica copies. This is done under the lock form
|
||||
* {@link #acquireReplicaOperationLock(ShardId, long, String, ActionListener)}.
|
||||
* {@link IndexShard#acquireReplicaOperationLock(long, ActionListener, String)}
|
||||
*
|
||||
* @param shardRequest the request to the replica shard
|
||||
* @param replica the replica shard to perform the operation on
|
||||
*/
|
||||
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest);
|
||||
protected abstract ReplicaResult shardOperationOnReplica(ReplicaRequest shardRequest, IndexShard replica);
|
||||
|
||||
/**
|
||||
* Cluster level block to check before request execution
|
||||
|
@ -436,6 +440,7 @@ public abstract class TransportReplicationAction<
|
|||
// allocation id of the replica this request is meant for
|
||||
private final String targetAllocationID;
|
||||
private final TransportChannel channel;
|
||||
private final IndexShard replica;
|
||||
/**
|
||||
* The task on the node with the replica shard.
|
||||
*/
|
||||
|
@ -449,12 +454,15 @@ public abstract class TransportReplicationAction<
|
|||
this.channel = channel;
|
||||
this.task = task;
|
||||
this.targetAllocationID = targetAllocationID;
|
||||
final ShardId shardId = request.shardId();
|
||||
assert shardId != null : "request shardId must be set";
|
||||
this.replica = getIndexShard(shardId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(Releasable releasable) {
|
||||
try {
|
||||
ReplicaResult replicaResult = shardOperationOnReplica(request);
|
||||
ReplicaResult replicaResult = shardOperationOnReplica(request, replica);
|
||||
releasable.close(); // release shard operation lock before responding to caller
|
||||
replicaResult.respond(new ResponseListener());
|
||||
} catch (Exception e) {
|
||||
|
@ -521,8 +529,12 @@ public abstract class TransportReplicationAction<
|
|||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
setPhase(task, "replica");
|
||||
assert request.shardId() != null : "request shardId must be set";
|
||||
acquireReplicaOperationLock(request.shardId(), request.primaryTerm(), targetAllocationID, this);
|
||||
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
|
||||
if (actualAllocationId.equals(targetAllocationID) == false) {
|
||||
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
|
||||
actualAllocationId);
|
||||
}
|
||||
replica.acquireReplicaOperationLock(request.primaryTerm, this, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -550,6 +562,11 @@ public abstract class TransportReplicationAction<
|
|||
}
|
||||
}
|
||||
|
||||
private IndexShard getIndexShard(ShardId shardId) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
return indexService.getShard(shardId.id());
|
||||
}
|
||||
|
||||
/**
|
||||
* Responsible for routing and retrying failed operations on the primary.
|
||||
* The actual primary operation is done in {@link ReplicationOperation} on the
|
||||
|
@ -816,10 +833,9 @@ public abstract class TransportReplicationAction<
|
|||
* tries to acquire reference to {@link IndexShard} to perform a primary operation. Released after performing primary operation locally
|
||||
* and replication of the operation to all replica shards is completed / failed (see {@link ReplicationOperation}).
|
||||
*/
|
||||
protected void acquirePrimaryShardReference(ShardId shardId, String allocationId,
|
||||
ActionListener<PrimaryShardReference> onReferenceAcquired) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
IndexShard indexShard = indexService.getShard(shardId.id());
|
||||
private void acquirePrimaryShardReference(ShardId shardId, String allocationId,
|
||||
ActionListener<PrimaryShardReference> onReferenceAcquired) {
|
||||
IndexShard indexShard = getIndexShard(shardId);
|
||||
// we may end up here if the cluster state used to route the primary is so stale that the underlying
|
||||
// index shard was replaced with a replica. For example - in a two node cluster, if the primary fails
|
||||
// the replica will take over and a replica will be assigned to the first node.
|
||||
|
@ -847,20 +863,6 @@ public abstract class TransportReplicationAction<
|
|||
indexShard.acquirePrimaryOperationLock(onAcquired, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* tries to acquire an operation on replicas. The lock is closed as soon as replication is completed on the node.
|
||||
*/
|
||||
protected void acquireReplicaOperationLock(ShardId shardId, long primaryTerm, final String allocationId,
|
||||
ActionListener<Releasable> onLockAcquired) {
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
IndexShard indexShard = indexService.getShard(shardId.id());
|
||||
final String actualAllocationId = indexShard.routingEntry().allocationId().getId();
|
||||
if (actualAllocationId.equals(allocationId) == false) {
|
||||
throw new ShardNotFoundException(shardId, "expected aID [{}] but found [{}]", allocationId, actualAllocationId);
|
||||
}
|
||||
indexShard.acquireReplicaOperationLock(primaryTerm, onLockAcquired, executor);
|
||||
}
|
||||
|
||||
/**
|
||||
* Indicated whether this operation should be replicated to shadow replicas or not. If this method returns true the replication phase
|
||||
* will be skipped. For example writes such as index and delete don't need to be replicated on shadow replicas but refresh and flush do.
|
||||
|
@ -899,7 +901,7 @@ public abstract class TransportReplicationAction<
|
|||
|
||||
@Override
|
||||
public PrimaryResult perform(Request request) throws Exception {
|
||||
PrimaryResult result = shardOperationOnPrimary(request);
|
||||
PrimaryResult result = shardOperationOnPrimary(request, indexShard);
|
||||
result.replicaRequest().primaryTerm(indexShard.getPrimaryTerm());
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -29,9 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.index.translog.Translog.Location;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -73,26 +71,15 @@ public abstract class TransportWriteAction<
|
|||
protected abstract Translog.Location onReplicaShard(Request request, IndexShard indexShard);
|
||||
|
||||
@Override
|
||||
protected final WritePrimaryResult shardOperationOnPrimary(Request request) throws Exception {
|
||||
IndexShard indexShard = indexShard(request);
|
||||
WriteResult<Response> result = onPrimaryShard(request, indexShard);
|
||||
return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), indexShard);
|
||||
protected final WritePrimaryResult shardOperationOnPrimary(Request request, IndexShard primary) throws Exception {
|
||||
WriteResult<Response> result = onPrimaryShard(request, primary);
|
||||
return new WritePrimaryResult(request, result.getResponse(), result.getLocation(), primary);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final WriteReplicaResult shardOperationOnReplica(Request request) {
|
||||
IndexShard indexShard = indexShard(request);
|
||||
Translog.Location location = onReplicaShard(request, indexShard);
|
||||
return new WriteReplicaResult(indexShard, request, location);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the IndexShard for the request. Protected so it can be mocked in tests.
|
||||
*/
|
||||
protected IndexShard indexShard(Request request) {
|
||||
final ShardId shardId = request.shardId();
|
||||
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
|
||||
return indexService.getShard(shardId.id());
|
||||
protected final WriteReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
|
||||
Translog.Location location = onReplicaShard(request, replica);
|
||||
return new WriteReplicaResult(replica, request, location);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -712,7 +712,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
|
||||
threadPool) {
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request request) {
|
||||
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
|
||||
assertIndexShardCounter(1);
|
||||
assertPhase(task, "replica");
|
||||
if (throwException) {
|
||||
|
@ -832,7 +832,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
Action action = new Action(Settings.EMPTY, "testActionWithExceptions", transportService, clusterService, shardStateAction,
|
||||
threadPool) {
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request request) {
|
||||
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
|
||||
assertPhase(task, "replica");
|
||||
if (throwException.get()) {
|
||||
throw new RetryOnReplicaException(shardId, "simulation");
|
||||
|
@ -958,14 +958,14 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected PrimaryResult shardOperationOnPrimary(Request shardRequest) throws Exception {
|
||||
protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception {
|
||||
boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true);
|
||||
assert executedBefore == false : "request has already been executed on the primary";
|
||||
return new PrimaryResult(shardRequest, new Response());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(Request request) {
|
||||
protected ReplicaResult shardOperationOnReplica(Request request, IndexShard replica) {
|
||||
request.processedOnReplicas.incrementAndGet();
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
|
|
@ -62,12 +62,12 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
noRefreshCall(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond);
|
||||
}
|
||||
|
||||
private <Result, Response> void noRefreshCall(ThrowingBiFunction<TestAction, TestRequest, Result> action,
|
||||
private <Result, Response> void noRefreshCall(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
|
||||
BiConsumer<Result, CapturingActionListener<Response>> responder)
|
||||
throws Exception {
|
||||
TestRequest request = new TestRequest();
|
||||
request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit
|
||||
Result result = action.apply(new TestAction(), request);
|
||||
Result result = action.apply(new TestAction(), request, indexShard);
|
||||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNotNull(listener.response);
|
||||
|
@ -83,12 +83,12 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
immediateRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, r -> {});
|
||||
}
|
||||
|
||||
private <Result, Response> void immediateRefresh(ThrowingBiFunction<TestAction, TestRequest, Result> action,
|
||||
private <Result, Response> void immediateRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
|
||||
BiConsumer<Result, CapturingActionListener<Response>> responder,
|
||||
Consumer<Response> responseChecker) throws Exception {
|
||||
TestRequest request = new TestRequest();
|
||||
request.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
|
||||
Result result = action.apply(new TestAction(), request);
|
||||
Result result = action.apply(new TestAction(), request, indexShard);
|
||||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNotNull(listener.response);
|
||||
|
@ -106,12 +106,12 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
waitForRefresh(TestAction::shardOperationOnReplica, TestAction.WriteReplicaResult::respond, (r, forcedRefresh) -> {});
|
||||
}
|
||||
|
||||
private <Result, Response> void waitForRefresh(ThrowingBiFunction<TestAction, TestRequest, Result> action,
|
||||
private <Result, Response> void waitForRefresh(ThrowingTriFunction<TestAction, TestRequest, IndexShard, Result> action,
|
||||
BiConsumer<Result, CapturingActionListener<Response>> responder,
|
||||
BiConsumer<Response, Boolean> resultChecker) throws Exception {
|
||||
TestRequest request = new TestRequest();
|
||||
request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
|
||||
Result result = action.apply(new TestAction(), request);
|
||||
Result result = action.apply(new TestAction(), request, indexShard);
|
||||
CapturingActionListener<Response> listener = new CapturingActionListener<>();
|
||||
responder.accept(result, listener);
|
||||
assertNull(listener.response); // Haven't reallresponded yet
|
||||
|
@ -135,11 +135,6 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
new IndexNameExpressionResolver(Settings.EMPTY), TestRequest::new, ThreadPool.Names.SAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected IndexShard indexShard(TestRequest request) {
|
||||
return indexShard;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected WriteResult<TestResponse> onPrimaryShard(TestRequest request, IndexShard indexShard) throws Exception {
|
||||
return new WriteResult<>(new TestResponse(), location);
|
||||
|
@ -185,7 +180,7 @@ public class TransportWriteActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private interface ThrowingBiFunction<A, B, R> {
|
||||
R apply(A a, B b) throws Exception;
|
||||
private interface ThrowingTriFunction<A, B, C, R> {
|
||||
R apply(A a, B b, C c) throws Exception;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue