replication base classes to allow for different implementation of the primary request and the replica request

This commit is contained in:
Shay Banon 2011-10-16 17:25:31 +02:00
parent 72ad722480
commit 1047cebabe
14 changed files with 129 additions and 76 deletions

View File

@ -33,9 +33,8 @@ import org.elasticsearch.transport.TransportService;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public class TransportIndexReplicationPingAction extends TransportIndexReplicationOperationAction<IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingResponse> {
public class TransportIndexReplicationPingAction extends TransportIndexReplicationOperationAction<IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportIndexReplicationPingAction(Settings settings, ClusterService clusterService,
TransportService transportService, ThreadPool threadPool,

View File

@ -31,9 +31,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (Shay Banon)
*/
public class TransportReplicationPingAction extends TransportIndicesReplicationOperationAction<ReplicationPingRequest, ReplicationPingResponse, IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingResponse> {
public class TransportReplicationPingAction extends TransportIndicesReplicationOperationAction<ReplicationPingRequest, ReplicationPingResponse, IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportReplicationPingAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexReplicationPingAction indexAction) {
super(settings, transportService, clusterService, threadPool, indexAction);

View File

@ -33,7 +33,7 @@ import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (shay.banon)
*/
public class TransportShardReplicationPingAction extends TransportShardReplicationOperationAction<ShardReplicationPingRequest, ShardReplicationPingResponse> {
public class TransportShardReplicationPingAction extends TransportShardReplicationOperationAction<ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportShardReplicationPingAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@ -53,6 +53,10 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
return new ShardReplicationPingRequest();
}
@Override protected ShardReplicationPingRequest newReplicaRequestInstance() {
return new ShardReplicationPingRequest();
}
@Override protected ShardReplicationPingResponse newResponseInstance() {
return new ShardReplicationPingResponse();
}
@ -61,11 +65,11 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
return "ping/replication/shard";
}
@Override protected PrimaryResponse<ShardReplicationPingResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
return new PrimaryResponse<ShardReplicationPingResponse>(new ShardReplicationPingResponse(), null);
@Override protected PrimaryResponse<ShardReplicationPingResponse, ShardReplicationPingRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
return new PrimaryResponse<ShardReplicationPingResponse, ShardReplicationPingRequest>(shardRequest.request, new ShardReplicationPingResponse(), null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
}
@Override protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {

View File

@ -58,7 +58,7 @@ import java.io.IOException;
*
* @author kimchy (shay.banon)
*/
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardResponse> {
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final MappingUpdatedAction mappingUpdatedAction;
@ -86,6 +86,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return new BulkShardRequest();
}
@Override protected BulkShardRequest newReplicaRequestInstance() {
return new BulkShardRequest();
}
@Override protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
}
@ -102,9 +106,9 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}
@Override protected PrimaryResponse<BulkShardResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
@Override protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.IndexingOperation[] ops = null;
@ -198,10 +202,10 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
BulkShardResponse response = new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
return new PrimaryResponse<BulkShardResponse>(response, ops);
return new PrimaryResponse<BulkShardResponse, BulkShardRequest>(shardRequest.request, response, ops);
}
@Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse> response) {
@Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse, BulkShardRequest> response) {
IndexService indexService = indicesService.indexServiceSafe(request.index());
Engine.IndexingOperation[] ops = (Engine.IndexingOperation[]) response.payload();
if (ops == null) {
@ -233,8 +237,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];

View File

@ -50,7 +50,7 @@ import org.elasticsearch.transport.TransportService;
*
* @author kimchy (shay.banon)
*/
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteResponse> {
public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final boolean autoCreateIndex;
@ -136,6 +136,10 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
return new DeleteRequest();
}
@Override protected DeleteRequest newReplicaRequestInstance() {
return new DeleteRequest();
}
@Override protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
}
@ -148,9 +152,9 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
@Override protected PrimaryResponse<DeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
@Override protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
@ -167,12 +171,12 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
}
DeleteResponse response = new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
return new PrimaryResponse<DeleteResponse>(response, null);
return new PrimaryResponse<DeleteResponse, DeleteRequest>(shardRequest.request, response, null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);

View File

@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteResponse> {
public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
@Inject public TransportIndexDeleteAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportShardDeleteAction deleteAction) {

View File

@ -38,7 +38,7 @@ import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (Shay Banon)
*/
public class TransportShardDeleteAction extends TransportShardReplicationOperationAction<ShardDeleteRequest, ShardDeleteResponse> {
public class TransportShardDeleteAction extends TransportShardReplicationOperationAction<ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
@Inject public TransportShardDeleteAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@ -54,6 +54,10 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
return new ShardDeleteRequest();
}
@Override protected ShardDeleteRequest newReplicaRequestInstance() {
return new ShardDeleteRequest();
}
@Override protected ShardDeleteResponse newResponseInstance() {
return new ShardDeleteResponse();
}
@ -70,9 +74,9 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
@Override protected PrimaryResponse<ShardDeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
@Override protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
@ -89,12 +93,12 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.notFound());
return new PrimaryResponse<ShardDeleteResponse>(response, null);
return new PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest>(shardRequest.request, response, null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);

View File

@ -33,9 +33,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportIndexDeleteByQueryAction indexDeleteByQueryAction) {

View File

@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportIndexDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportShardDeleteByQueryAction shardDeleteByQueryAction) {

View File

@ -38,7 +38,7 @@ import org.elasticsearch.transport.TransportService;
/**
* @author kimchy (Shay Banon)
*/
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportShardDeleteByQueryAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@ -58,6 +58,10 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
return new ShardDeleteByQueryRequest();
}
@Override protected ShardDeleteByQueryRequest newReplicaRequestInstance() {
return new ShardDeleteByQueryRequest();
}
@Override protected ShardDeleteByQueryResponse newResponseInstance() {
return new ShardDeleteByQueryResponse();
}
@ -70,17 +74,18 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
@Override protected PrimaryResponse<ShardDeleteByQueryResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
@Override protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
return new PrimaryResponse<ShardDeleteByQueryResponse>(new ShardDeleteByQueryResponse(), null);
return new PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>(shardRequest.request, new ShardDeleteByQueryResponse(), null);
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
}

View File

@ -66,7 +66,7 @@ import java.util.concurrent.TimeUnit;
*
* @author kimchy (shay.banon)
*/
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexResponse> {
public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
private final boolean autoCreateIndex;
@ -136,6 +136,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
return new IndexRequest();
}
@Override protected IndexRequest newReplicaRequestInstance() {
return new IndexRequest();
}
@Override protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
@ -157,7 +161,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
@Override protected PrimaryResponse<IndexResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
@Override protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final IndexRequest request = shardRequest.request;
// validate, if routing is required, that we got routing
@ -168,7 +172,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
IndexShard indexShard = indexShard(shardRequest);
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
@ -204,10 +208,10 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
request.version(version);
IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version);
return new PrimaryResponse<IndexResponse>(response, op);
return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op);
}
@Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse> response) {
@Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse, IndexRequest> response) {
Engine.IndexingOperation op = (Engine.IndexingOperation) response.payload();
if (!Strings.hasLength(request.percolate())) {
return;
@ -221,8 +225,8 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
}
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
@Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
@ -40,15 +41,15 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction;
protected final TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction;
@Inject public TransportIndexReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction) {
TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction) {
super(settings, threadPool);
this.clusterService = clusterService;
this.shardAction = shardAction;

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
@ -37,20 +38,20 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* @author kimchy (shay.banon)
*/
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ClusterService clusterService;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction;
final String transportAction;
@Inject public TransportIndicesReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction) {
TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction) {
super(settings, threadPool);
this.clusterService = clusterService;
this.indexAction = indexAction;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
@ -46,7 +47,6 @@ import org.elasticsearch.index.engine.DocumentAlreadyExistsEngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
@ -67,9 +67,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.ExceptionsHelper.*;
/**
* @author kimchy (shay.banon)
*/
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
protected final TransportService transportService;
@ -115,20 +114,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected abstract Request newRequestInstance();
protected abstract ReplicaRequest newReplicaRequestInstance();
protected abstract Response newResponseInstance();
protected abstract String transportAction();
protected abstract String executor();
protected abstract PrimaryResponse<Response> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
/**
* Called once replica operations have been dispatched on the
*/
protected void postPrimaryOperation(Request request, PrimaryResponse<Response> response) {
protected void postPrimaryOperation(Request request, PrimaryResponse<Response, ReplicaRequest> response) {
}
@ -156,10 +157,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return transportAction() + "/replica";
}
protected IndexShard indexShard(ShardOperationRequest shardRequest) {
return indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
}
protected boolean retryPrimaryException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof IndexShardMissingException ||
@ -231,32 +228,32 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ShardOperationRequest> {
class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
@Override public ShardOperationRequest newInstance() {
return new ShardOperationRequest();
@Override public ReplicaOperationRequest newInstance() {
return new ReplicaOperationRequest();
}
@Override public String executor() {
return executor;
}
@Override public void messageReceived(final ShardOperationRequest request, final TransportChannel channel) throws Exception {
@Override public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
protected class ShardOperationRequest implements Streamable {
protected class PrimaryOperationRequest implements Streamable {
public int shardId;
public Request request;
public ShardOperationRequest() {
public PrimaryOperationRequest() {
}
public ShardOperationRequest(int shardId, Request request) {
public PrimaryOperationRequest(int shardId, Request request) {
this.shardId = shardId;
this.request = request;
}
@ -273,6 +270,32 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
protected class ReplicaOperationRequest implements Streamable {
public int shardId;
public ReplicaRequest request;
public ReplicaOperationRequest() {
}
public ReplicaOperationRequest(int shardId, ReplicaRequest request) {
this.shardId = shardId;
this.request = request;
}
@Override public void readFrom(StreamInput in) throws IOException {
shardId = in.readVInt();
request = newReplicaRequestInstance();
request.readFrom(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(shardId);
request.writeTo(out);
}
}
protected class AsyncShardOperationAction {
private final ActionListener<Response> listener;
@ -461,7 +484,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final ShardRouting shard, ClusterState clusterState) {
try {
PrimaryResponse<Response> response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
performReplicas(response);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
@ -478,7 +501,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
void performReplicas(final PrimaryResponse<Response> response) {
void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) {
postPrimaryOperation(request, response);
listener.onResponse(response.response());
@ -543,7 +566,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
void performOnReplica(final PrimaryResponse<Response> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!nodes.nodeExists(nodeId)) {
@ -553,7 +576,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return;
}
final ShardOperationRequest shardRequest = new ShardOperationRequest(shardIt.shardId().id(), request);
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions(), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@ -610,16 +633,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
}
public static class PrimaryResponse<T> {
private final T response;
public static class PrimaryResponse<Response, ReplicaRequest> {
private final ReplicaRequest replicaRequest;
private final Response response;
private final Object payload;
public PrimaryResponse(T response, Object payload) {
public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) {
this.replicaRequest = replicaRequest;
this.response = response;
this.payload = payload;
}
public T response() {
public ReplicaRequest replicaRequest() {
return this.replicaRequest;
}
public Response response() {
return response;
}