Replication Actions: Allow to control replication type - async or sync, closes #196.

This commit is contained in:
kimchy 2010-05-28 03:47:35 +03:00
parent 477024e3be
commit 84a5c1eac8
14 changed files with 194 additions and 6 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.util.Required;
import org.elasticsearch.util.TimeValue;
@ -107,6 +108,14 @@ public class DeleteRequest extends ShardReplicationOperationRequest {
return this;
}
/**
* Set the replication type for this operation.
*/
@Override public DeleteRequest replicationType(ReplicationType replicationType) {
super.replicationType(replicationType);
return this;
}
/**
* The type of the document to delete.
*/

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Requests;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.util.Required;
@ -219,6 +220,14 @@ public class DeleteByQueryRequest extends IndicesReplicationOperationRequest {
return this;
}
/**
* The replication type to use with this operation.
*/
public DeleteByQueryRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -43,17 +43,13 @@ public class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest
private String queryParserName;
private String[] types = Strings.EMPTY_ARRAY;
public IndexDeleteByQueryRequest(String index, String... types) {
this.index = index;
this.types = types;
}
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index) {
this.index = index;
this.timeout = request.timeout();
this.querySource = request.querySource();
this.queryParserName = request.queryParserName();
this.types = request.types();
this.replicationType = request.replicationType();
}

View File

@ -54,6 +54,7 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest
ShardDeleteByQueryRequest(IndexDeleteByQueryRequest request, int shardId) {
this(request.index(), request.querySource(), request.queryParserName(), request.types(), shardId);
replicationType(request.replicationType());
timeout = request.timeout();
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.util.Required;
import org.elasticsearch.util.TimeValue;
@ -342,6 +343,14 @@ public class IndexRequest extends ShardReplicationOperationRequest {
}
}
/**
* Set the replication type for this operation.
*/
@Override public IndexRequest replicationType(ReplicationType replicationType) {
super.replicationType(replicationType);
return this;
}
/**
* Set to <tt>true</tt> to force this index to use {@link OpType#CREATE}.
*/

View File

@ -40,6 +40,8 @@ public class IndexReplicationOperationRequest implements ActionRequest {
private boolean threadedListener = false;
protected ReplicationType replicationType = ReplicationType.DEFAULT;
public TimeValue timeout() {
return timeout;
}
@ -52,6 +54,10 @@ public class IndexReplicationOperationRequest implements ActionRequest {
return this.threadedListener;
}
public ReplicationType replicationType() {
return this.replicationType;
}
@Override public IndexReplicationOperationRequest listenerThreaded(boolean threadedListener) {
this.threadedListener = threadedListener;
return this;
@ -66,11 +72,13 @@ public class IndexReplicationOperationRequest implements ActionRequest {
}
@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readUTF();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
timeout.writeTo(out);
out.writeUTF(index);
}

View File

@ -38,6 +38,8 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
private boolean threadedListener = false;
protected ReplicationType replicationType = ReplicationType.DEFAULT;
public TimeValue timeout() {
return timeout;
}
@ -70,7 +72,12 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
return this;
}
public ReplicationType replicationType() {
return this.replicationType;
}
@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
@ -79,6 +86,7 @@ public class IndicesReplicationOperationRequest implements ActionRequest {
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
timeout.writeTo(out);
out.writeVInt(indices.length);
for (String index : indices) {

View File

@ -0,0 +1,82 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
/**
* The type of replication to perform.
*/
public enum ReplicationType {
/**
* Sync replication, wait till all replicas have performed the operation.
*/
SYNC((byte) 0),
/**
* Async replication. Will send the request to replicas, but will not wait for it
*/
ASYNC((byte) 1),
/**
* Use the default replication type configured for this node.
*/
DEFAULT((byte) 2);
private byte id;
ReplicationType(byte id) {
this.id = id;
}
/**
* The internal representation of the operation type.
*/
public byte id() {
return id;
}
/**
* Constructs the operation type from its internal representation.
*/
public static ReplicationType fromId(byte id) {
if (id == 0) {
return ASYNC;
} else if (id == 1) {
return SYNC;
} else if (id == 2) {
return DEFAULT;
} else {
throw new ElasticSearchIllegalArgumentException("No type match for [" + id + "]");
}
}
/**
* Parse the replication type from string.
*/
public static ReplicationType fromString(String type) {
if ("async".equals(type)) {
return ASYNC;
} else if ("sync".equals(type)) {
return SYNC;
} else if ("default".equals(type)) {
return DEFAULT;
}
throw new ElasticSearchIllegalArgumentException("No replication type match for [" + type + "], should be either `async`, or `sync`");
}
}

View File

@ -35,6 +35,7 @@ import static org.elasticsearch.action.Actions.*;
*/
public abstract class ShardReplicationOperationRequest implements ActionRequest {
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
protected TimeValue timeout = DEFAULT_TIMEOUT;
@ -43,6 +44,7 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
private boolean threadedListener = false;
private boolean threadedOperation = true;
private ReplicationType replicationType = ReplicationType.DEFAULT;
public TimeValue timeout() {
return timeout;
@ -89,6 +91,21 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
return this;
}
/**
* The replication type.
*/
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* Sets the replication type.
*/
public ShardReplicationOperationRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
@Override public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (index == null) {
@ -98,12 +115,14 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest
}
@Override public void readFrom(StreamInput in) throws IOException {
replicationType = ReplicationType.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readUTF();
// no need to serialize threaded* parameters, since they only matter locally
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeByte(replicationType.id());
timeout.writeTo(out);
out.writeUTF(index);
}

View File

@ -69,6 +69,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final ShardStateAction shardStateAction;
protected final ReplicationType defaultReplicationType;
protected TransportShardReplicationOperationAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ShardStateAction shardStateAction) {
@ -81,6 +83,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
transportService.registerHandler(transportAction(), new OperationTransportHandler());
transportService.registerHandler(transportBackupAction(), new BackupOperationTransportHandler());
this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
}
@Override protected void doExecute(Request request, ActionListener<Response> listener) {
@ -207,9 +211,17 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private final ReplicationType replicationType;
private AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
if (request.replicationType() != ReplicationType.DEFAULT) {
replicationType = request.replicationType();
} else {
replicationType = defaultReplicationType;
}
}
public void start() {
@ -354,6 +366,22 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
// initialize the counter
int backupCounter = 0;
if (replicationType == ReplicationType.ASYNC) {
// async replication, notify the listener
if (alreadyThreaded || !request.listenerThreaded()) {
listener.onResponse(response);
} else {
threadPool.execute(new Runnable() {
@Override public void run() {
listener.onResponse(response);
}
});
}
// now, trick the counter so it won't decrease to 0
backupCounter = -100;
}
for (final ShardRouting shard : shards.reset()) {
if (shard.primary()) {
continue;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.rest.action.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -51,6 +52,12 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local
deleteRequest.operationThreaded(true);
String replicationType = request.param("replication");
if (replicationType != null) {
deleteRequest.replicationType(ReplicationType.fromString(replicationType));
}
client.delete(deleteRequest, new ActionListener<DeleteResponse>() {
@Override public void onResponse(DeleteResponse result) {
try {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.ShardDeleteByQueryRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
@ -70,6 +71,11 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
deleteByQueryRequest.types(RestActions.splitTypes(typesParam));
}
deleteByQueryRequest.timeout(request.paramAsTime("timeout", ShardDeleteByQueryRequest.DEFAULT_TIMEOUT));
String replicationType = request.param("replication");
if (replicationType != null) {
deleteByQueryRequest.replicationType(ReplicationType.fromString(replicationType));
}
} catch (Exception e) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);

View File

@ -22,6 +22,7 @@ package org.elasticsearch.rest.action.index;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
@ -75,6 +76,10 @@ public class RestIndexAction extends BaseRestHandler {
}
}
}
String replicationType = request.param("replication");
if (replicationType != null) {
indexRequest.replicationType(ReplicationType.fromString(replicationType));
}
// we just send a response, no need to fork
indexRequest.listenerThreaded(false);
// we don't spawn, then fork if local

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.broadcast.BroadcastOperationThreading;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.elasticsearch.util.Unicode;
@ -131,7 +132,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
}
logger.info("Delete [type1/1]");
DeleteResponse deleteResponse = client1.delete(deleteRequest("test").type("type1").id("1")).actionGet();
DeleteResponse deleteResponse = client1.delete(deleteRequest("test").type("type1").id("1").replicationType(ReplicationType.ASYNC)).actionGet();
assertThat(deleteResponse.index(), equalTo(getConcreteIndexName()));
assertThat(deleteResponse.id(), equalTo("1"));
assertThat(deleteResponse.type(), equalTo("type1"));