From a7cce9cd38845e5e634b9d9a694adc4dd261d664 Mon Sep 17 00:00:00 2001 From: kimchy Date: Fri, 28 May 2010 03:56:04 +0300 Subject: [PATCH] Replication Actions: Allow to control replication type - `async` or `sync`, closes #196. --- .../IndexReplicationPingRequest.java | 13 ++++++++++++- .../replication/ReplicationPingRequest.java | 6 ++++++ .../ShardReplicationPingRequest.java | 19 ++++++++++++++++++- .../RestReplicationPingAction.java | 5 +++++ 4 files changed, 41 insertions(+), 2 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/IndexReplicationPingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/IndexReplicationPingRequest.java index fa96147df83..cf4856b2b81 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/IndexReplicationPingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/IndexReplicationPingRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.ping.replication; import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamOutput; @@ -27,7 +28,7 @@ import org.elasticsearch.util.io.stream.StreamOutput; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class IndexReplicationPingRequest extends IndexReplicationOperationRequest { @@ -38,6 +39,7 @@ public class IndexReplicationPingRequest extends IndexReplicationOperationReques IndexReplicationPingRequest(ReplicationPingRequest request, String index) { this.index = index; this.timeout = request.timeout(); + this.replicationType = request.replicationType(); } IndexReplicationPingRequest() { @@ -48,6 +50,15 @@ public class IndexReplicationPingRequest extends IndexReplicationOperationReques return this; } + /** + * The replication type to use with this operation. + */ + public IndexReplicationPingRequest replicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + return this; + } + + public void readFrom(StreamInput in) throws IOException { super.readFrom(in); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ReplicationPingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ReplicationPingRequest.java index 935ef2c5bad..d903a325e6a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ReplicationPingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ReplicationPingRequest.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.cluster.ping.replication; import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.util.TimeValue; /** @@ -40,6 +41,11 @@ public class ReplicationPingRequest extends IndicesReplicationOperationRequest { return this; } + public ReplicationPingRequest replicationType(ReplicationType replicationType) { + this.replicationType = replicationType; + return this; + } + public ReplicationPingRequest timeout(TimeValue timeout) { this.timeout = timeout; return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java index e317ef09925..c73f58cdc4b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.cluster.ping.replication; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest; import org.elasticsearch.util.io.stream.StreamInput; import org.elasticsearch.util.io.stream.StreamOutput; @@ -26,7 +27,7 @@ import org.elasticsearch.util.io.stream.StreamOutput; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class ShardReplicationPingRequest extends ShardReplicationOperationRequest { @@ -35,6 +36,7 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques public ShardReplicationPingRequest(IndexReplicationPingRequest request, int shardId) { this(request.index(), shardId); timeout = request.timeout(); + replicationType(request.replicationType()); } public ShardReplicationPingRequest(String index, int shardId) { @@ -49,6 +51,21 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques return this.shardId; } + @Override public ShardReplicationPingRequest listenerThreaded(boolean threadedListener) { + super.listenerThreaded(threadedListener); + return this; + } + + @Override public ShardReplicationPingRequest operationThreaded(boolean threadedOperation) { + super.operationThreaded(threadedOperation); + return this; + } + + @Override public ShardReplicationPingRequest replicationType(ReplicationType replicationType) { + super.replicationType(replicationType); + return this; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = in.readVInt(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java index 1089d40e744..4012ba42891 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.IndexReplicationP import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest; import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse; import org.elasticsearch.action.admin.cluster.ping.replication.ShardReplicationPingRequest; +import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; @@ -51,6 +52,10 @@ public class RestReplicationPingAction extends BaseRestHandler { ReplicationPingRequest replicationPingRequest = new ReplicationPingRequest(RestActions.splitIndices(request.param("index"))); replicationPingRequest.timeout(request.paramAsTime("timeout", ShardReplicationPingRequest.DEFAULT_TIMEOUT)); replicationPingRequest.listenerThreaded(false); + String replicationType = request.param("replication"); + if (replicationType != null) { + replicationPingRequest.replicationType(ReplicationType.fromString(replicationType)); + } client.admin().cluster().ping(replicationPingRequest, new ActionListener() { @Override public void onResponse(ReplicationPingResponse result) { try {