Replication Actions: Allow to control replication type - `async` or `sync`, closes #196.

This commit is contained in:
kimchy 2010-05-28 03:56:04 +03:00
parent 84a5c1eac8
commit a7cce9cd38
4 changed files with 41 additions and 2 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.ping.replication;
import org.elasticsearch.action.support.replication.IndexReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
@ -27,7 +28,7 @@ import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class IndexReplicationPingRequest extends IndexReplicationOperationRequest {
@ -38,6 +39,7 @@ public class IndexReplicationPingRequest extends IndexReplicationOperationReques
IndexReplicationPingRequest(ReplicationPingRequest request, String index) {
this.index = index;
this.timeout = request.timeout();
this.replicationType = request.replicationType();
}
IndexReplicationPingRequest() {
@ -48,6 +50,15 @@ public class IndexReplicationPingRequest extends IndexReplicationOperationReques
return this;
}
/**
* The replication type to use with this operation.
*/
public IndexReplicationPingRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.ping.replication;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.util.TimeValue;
/**
@ -40,6 +41,11 @@ public class ReplicationPingRequest extends IndicesReplicationOperationRequest {
return this;
}
public ReplicationPingRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public ReplicationPingRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action.admin.cluster.ping.replication;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
@ -26,7 +27,7 @@ import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ShardReplicationPingRequest extends ShardReplicationOperationRequest {
@ -35,6 +36,7 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques
public ShardReplicationPingRequest(IndexReplicationPingRequest request, int shardId) {
this(request.index(), shardId);
timeout = request.timeout();
replicationType(request.replicationType());
}
public ShardReplicationPingRequest(String index, int shardId) {
@ -49,6 +51,21 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques
return this.shardId;
}
@Override public ShardReplicationPingRequest listenerThreaded(boolean threadedListener) {
super.listenerThreaded(threadedListener);
return this;
}
@Override public ShardReplicationPingRequest operationThreaded(boolean threadedOperation) {
super.operationThreaded(threadedOperation);
return this;
}
@Override public ShardReplicationPingRequest replicationType(ReplicationType replicationType) {
super.replicationType(replicationType);
return this;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = in.readVInt();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.admin.cluster.ping.replication.IndexReplicationP
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingResponse;
import org.elasticsearch.action.admin.cluster.ping.replication.ShardReplicationPingRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
@ -51,6 +52,10 @@ public class RestReplicationPingAction extends BaseRestHandler {
ReplicationPingRequest replicationPingRequest = new ReplicationPingRequest(RestActions.splitIndices(request.param("index")));
replicationPingRequest.timeout(request.paramAsTime("timeout", ShardReplicationPingRequest.DEFAULT_TIMEOUT));
replicationPingRequest.listenerThreaded(false);
String replicationType = request.param("replication");
if (replicationType != null) {
replicationPingRequest.replicationType(ReplicationType.fromString(replicationType));
}
client.admin().cluster().ping(replicationPingRequest, new ActionListener<ReplicationPingResponse>() {
@Override public void onResponse(ReplicationPingResponse result) {
try {