[REPLICATION] Remove `async` replication

Closes #10114
This commit is contained in:
Simon Willnauer 2015-03-19 14:14:48 -07:00
parent 955ff05a5e
commit 1168347b9d
35 changed files with 18 additions and 400 deletions

View File

@ -144,4 +144,4 @@ is the same).
[[limitations]]
=== Limitations
The delete by query does not support the following queries and filters: `has_child`, `has_parent` and `top_children`.
The delete by query does not support the following queries and filters: `has_child`, `has_parent` and `top_children`.

View File

@ -35,8 +35,6 @@ The result of the above index operation is:
The `_shards` header provides information about the replication process of the index operation.
* `total` - Indicates to how many shard copies (primary and replica shards) the index operation should be executed on.
* `successful`- Indicates the number of shard copies the index operation succeeded on.
* `pending` - Indicates how many shard copies this index operation still needs to go to at the time index operation
succeeded on the primary shard. This field is only returned if `async` replication is used.
* `failures` - An array that contains replication related errors in the case an index operation failed on a replica shard.
The index operation is successful in the case `successful` is at least 1.

View File

@ -1,30 +0,0 @@
---
"Index check shard header":
- do:
indices.create:
index: foobar
body:
settings:
number_of_shards: "1"
number_of_replicas: "0"
- do:
cluster.health:
wait_for_status: green
- do:
index:
index: foobar
type: baz
id: 1
body: { foo: bar }
- match: { _index: foobar }
- match: { _type: baz }
- match: { _id: "1"}
- match: { _version: 1}
- match: { _shards.total: 1}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}
- is_false: _shards.pending

View File

@ -2,4 +2,4 @@ Tests missing for:
# consistency
# retry_on_conflict
# timeout
# timeout

View File

@ -63,17 +63,15 @@ public abstract class ActionWriteResponse extends ActionResponse {
private int total;
private int successful;
private int pending;
private Failure[] failures = EMPTY;
public ShardInfo() {
}
public ShardInfo(int total, int successful, int pending, Failure... failures) {
assert total >= 0 && successful >= 0 && pending >= 0;
public ShardInfo(int total, int successful, Failure... failures) {
assert total >= 0 && successful >= 0;
this.total = total;
this.successful = successful;
this.pending = pending;
this.failures = failures;
}
@ -91,14 +89,6 @@ public abstract class ActionWriteResponse extends ActionResponse {
return successful;
}
/**
* @return the total number of shards a write is still to be performed on at the time this response was
* created. Typically this will only contain 0, but when async replication is used this number is higher than 0.
*/
public int getPending() {
return pending;
}
/**
* @return The total number of replication failures.
*/
@ -127,7 +117,6 @@ public abstract class ActionWriteResponse extends ActionResponse {
public void readFrom(StreamInput in) throws IOException {
total = in.readVInt();
successful = in.readVInt();
pending = in.readVInt();
int size = in.readVInt();
failures = new Failure[size];
for (int i = 0; i < size; i++) {
@ -141,7 +130,6 @@ public abstract class ActionWriteResponse extends ActionResponse {
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(total);
out.writeVInt(successful);
out.writeVInt(pending);
out.writeVInt(failures.length);
for (Failure failure : failures) {
failure.writeTo(out);
@ -153,9 +141,6 @@ public abstract class ActionWriteResponse extends ActionResponse {
builder.startObject(Fields._SHARDS);
builder.field(Fields.TOTAL, total);
builder.field(Fields.SUCCESSFUL, successful);
if (pending > 0) {
builder.field(Fields.PENDING, pending);
}
builder.field(Fields.FAILED, getFailed());
if (failures.length > 0) {
builder.startArray(Fields.FAILURES);

View File

@ -24,7 +24,6 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
@ -58,7 +57,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
List<Object> payloads = null;
protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private boolean refresh = false;
@ -410,18 +408,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
return this.refresh;
}
/**
* Set the replication type for this operation.
*/
public BulkRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* A timeout to wait if the index operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
@ -472,7 +458,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
int size = in.readVInt();
for (int i = 0; i < size; i++) {
@ -498,7 +483,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
out.writeVInt(requests.size());
for (ActionRequest request : requests) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
@ -110,14 +109,6 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
return this;
}
/**
* Set the replication type for this operation.
*/
public BulkRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
*/

View File

@ -307,7 +307,6 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
final ShardId shardId = entry.getKey();
final List<BulkItemRequest> requests = entry.getValue();
BulkShardRequest bulkShardRequest = new BulkShardRequest(bulkRequest, shardId.index().name(), shardId.id(), bulkRequest.refresh(), requests.toArray(new BulkItemRequest[requests.size()]));
bulkShardRequest.replicationType(bulkRequest.replicationType());
bulkShardRequest.consistencyLevel(bulkRequest.consistencyLevel());
bulkShardRequest.timeout(bulkRequest.timeout());
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>() {

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
@ -101,15 +100,6 @@ public class DeleteRequestBuilder extends ShardReplicationOperationRequestBuilde
return this;
}
/**
* Set the replication type for this operation.
*/
@Override
public DeleteRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
*/

View File

@ -34,7 +34,7 @@ class IndexDeleteRequest extends IndexReplicationOperationRequest<IndexDeleteReq
private final String originalIndex;
IndexDeleteRequest(DeleteRequest request, String concreteIndex) {
super(concreteIndex, request.timeout(), request.replicationType(), request.consistencyLevel(),
super(concreteIndex, request.timeout(), request.consistencyLevel(),
request.indices(), request.indicesOptions(), request);
this.type = request.type();
this.id = request.id();

View File

@ -48,7 +48,6 @@ public class ShardDeleteRequest extends ShardReplicationOperationRequest<ShardDe
this.shardId = shardId;
this.type = request.type();
this.id = request.id();
replicationType(request.replicationType());
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.refresh = request.refresh();

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -134,24 +133,6 @@ public class DeleteByQueryRequestBuilder extends IndicesReplicationOperationRequ
return this;
}
/**
* The replication type to use with this operation.
*/
@Override
public DeleteByQueryRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* The replication type to use with this operation.
*/
@Override
public DeleteByQueryRequestBuilder setReplicationType(String replicationType) {
request.replicationType(replicationType);
return this;
}
@Override
public DeleteByQueryRequestBuilder setConsistencyLevel(WriteConsistencyLevel consistencyLevel) {
request.consistencyLevel(consistencyLevel);

View File

@ -40,7 +40,7 @@ class IndexDeleteByQueryRequest extends IndexReplicationOperationRequest<IndexDe
IndexDeleteByQueryRequest(DeleteByQueryRequest request, String index, @Nullable Set<String> routing, @Nullable String[] filteringAliases,
long nowInMillis) {
super(index, request.timeout(), request.replicationType(), request.consistencyLevel(), request.indices(), request.indicesOptions(), request);
super(index, request.timeout(), request.consistencyLevel(), request.indices(), request.indicesOptions(), request);
this.source = request.source();
this.types = request.types();
this.routing = routing;

View File

@ -59,7 +59,6 @@ public class ShardDeleteByQueryRequest extends ShardReplicationOperationRequest<
this.source = request.source();
this.types = request.types();
this.shardId = shardId;
replicationType(request.replicationType());
consistencyLevel(request.consistencyLevel());
timeout = request.timeout();
this.routing = request.routing();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
@ -253,15 +252,6 @@ public class IndexRequestBuilder extends ShardReplicationOperationRequestBuilder
return this;
}
/**
* Set the replication type for this operation.
*/
@Override
public IndexRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the consistency level. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}.
*/
@ -271,15 +261,6 @@ public class IndexRequestBuilder extends ShardReplicationOperationRequestBuilder
return this;
}
/**
* Set the replication type for this operation.
*/
@Override
public IndexRequestBuilder setReplicationType(String replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the version, which will cause the index operation to only be performed if a matching
* version exists and no changes happened on the doc since then.

View File

@ -36,16 +36,14 @@ public abstract class IndexReplicationOperationRequest<T extends IndexReplicatio
private final TimeValue timeout;
private final String index;
private final ReplicationType replicationType;
private final WriteConsistencyLevel consistencyLevel;
private final OriginalIndices originalIndices;
protected IndexReplicationOperationRequest(String index, TimeValue timeout, ReplicationType replicationType, WriteConsistencyLevel consistencyLevel,
protected IndexReplicationOperationRequest(String index, TimeValue timeout, WriteConsistencyLevel consistencyLevel,
String[] originalIndices, IndicesOptions originalIndicesOptions, ActionRequest request) {
super(request);
this.index = index;
this.timeout = timeout;
this.replicationType = replicationType;
this.consistencyLevel = consistencyLevel;
this.originalIndices = new OriginalIndices(originalIndices, originalIndicesOptions);
}
@ -73,10 +71,6 @@ public abstract class IndexReplicationOperationRequest<T extends IndexReplicatio
return originalIndices.indicesOptions();
}
public ReplicationType replicationType() {
return this.replicationType;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}

View File

@ -39,7 +39,6 @@ public abstract class IndicesReplicationOperationRequest<T extends IndicesReplic
protected String[] indices;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false);
protected ReplicationType replicationType = ReplicationType.DEFAULT;
protected WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
public TimeValue timeout() {
@ -100,29 +99,6 @@ public abstract class IndicesReplicationOperationRequest<T extends IndicesReplic
return (T) this;
}
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public final T replicationType(ReplicationType replicationType) {
if (replicationType == null) {
throw new IllegalArgumentException("ReplicationType must not be null");
}
this.replicationType = replicationType;
return (T) this;
}
/**
* Sets the replication type.
*/
public final T replicationType(String replicationType) {
return replicationType(ReplicationType.fromString(replicationType));
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
@ -147,7 +123,6 @@ public abstract class IndicesReplicationOperationRequest<T extends IndicesReplic
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
indices = in.readStringArray();
@ -157,7 +132,6 @@ public abstract class IndicesReplicationOperationRequest<T extends IndicesReplic
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeStringArrayNullable(indices);

View File

@ -69,24 +69,6 @@ public abstract class IndicesReplicationOperationRequestBuilder<Request extends
return (RequestBuilder) this;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return (RequestBuilder) this;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setReplicationType(String replicationType) {
request.replicationType(replicationType);
return (RequestBuilder) this;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
/**
* The type of replication to perform.
*/
public enum ReplicationType {
/**
* Sync replication, wait till all replicas have performed the operation.
*/
SYNC((byte) 0),
/**
* Async replication. Will send the request to replicas, but will not wait for it
*/
ASYNC((byte) 1),
/**
* Use the default replication type configured for this node.
*/
DEFAULT((byte) 2);
private byte id;
ReplicationType(byte id) {
this.id = id;
}
/**
* The internal representation of the operation type.
*/
public byte id() {
return id;
}
/**
* Constructs the operation type from its internal representation.
*/
public static ReplicationType fromId(byte id) {
if (id == 0) {
return SYNC;
} else if (id == 1) {
return ASYNC;
} else if (id == 2) {
return DEFAULT;
} else {
throw new ElasticsearchIllegalArgumentException("No type match for [" + id + "]");
}
}
/**
* Parse the replication type from string.
*/
public static ReplicationType fromString(String type) {
if ("async".equals(type)) {
return ASYNC;
} else if ("sync".equals(type)) {
return SYNC;
} else if ("default".equals(type)) {
return DEFAULT;
}
throw new ElasticsearchIllegalArgumentException("No replication type match for [" + type + "], should be either `async`, or `sync`");
}
}

View File

@ -45,7 +45,6 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
protected String index;
private boolean threadedOperation = true;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private volatile boolean canHaveDuplicates = false;
@ -76,7 +75,6 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
this.timeout = request.timeout();
this.index = request.index();
this.threadedOperation = request.operationThreaded();
this.replicationType = request.replicationType();
this.consistencyLevel = request.consistencyLevel();
}
@ -148,29 +146,6 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
/**
* The replication type.
*/
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public final T replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return (T) this;
}
/**
* Sets the replication type.
*/
public final T replicationType(String replicationType) {
return replicationType(ReplicationType.fromString(replicationType));
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
@ -196,7 +171,6 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
timeout = TimeValue.readTimeValue(in);
index = in.readString();
@ -207,7 +181,6 @@ public abstract class ShardReplicationOperationRequest<T extends ShardReplicatio
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
timeout.writeTo(out);
out.writeString(index);

View File

@ -68,24 +68,6 @@ public abstract class ShardReplicationOperationRequestBuilder<Request extends Sh
return (RequestBuilder) this;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return (RequestBuilder) this;
}
/**
* Sets the replication type.
*/
@SuppressWarnings("unchecked")
public RequestBuilder setReplicationType(String replicationType) {
request.replicationType(replicationType);
return (RequestBuilder) this;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/

View File

@ -115,7 +115,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
}
Failure failure = new Failure(request.index(), shardIt.shardId().id(), null,
"Failed to execute on all shard copies [" + ExceptionsHelper.detailedMessage(e) + "]", status, true);
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, 0, failure)));
shardsResponses.set(index, new ShardActionResult(new ActionWriteResponse.ShardInfo(shardIt.size(), 0, failure)));
returnIfNeeded();
}
@ -125,7 +125,6 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
List<Failure> failureList = new ArrayList<>();
int total = 0;
int pending = 0;
int successful = 0;
for (int i = 0; i < shardsResponses.length(); i++) {
ShardActionResult shardActionResult = shardsResponses.get(i);
@ -138,7 +137,6 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
sf = shardActionResult.shardResponse.getShardInfo();
}
total += sf.getTotal();
pending += sf.getPending();
successful += sf.getSuccessful();
failureList.addAll(Arrays.asList(sf.getFailures()));
}
@ -150,7 +148,7 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
} else {
failures = failureList.toArray(new Failure[failureList.size()]);
}
listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, pending, failures)));
listener.onResponse(newResponseInstance(request, responses, new ActionWriteResponse.ShardInfo(total, successful, failures)));
}
}

View File

@ -70,7 +70,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final ClusterService clusterService;
protected final IndicesService indicesService;
protected final ShardStateAction shardStateAction;
protected final ReplicationType defaultReplicationType;
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
@ -96,7 +95,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
this.transportOptions = transportOptions();
this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
}
@ -310,18 +308,11 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
private final InternalRequest internalRequest;
private volatile ShardIterator shardIt;
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
private final ReplicationType replicationType;
private volatile ClusterStateObserver observer;
AsyncShardOperationAction(Request request, ActionListener<Response> listener) {
this.internalRequest = new InternalRequest(request);
this.listener = listener;
if (request.replicationType() != ReplicationType.DEFAULT) {
replicationType = request.replicationType();
} else {
replicationType = defaultReplicationType;
}
}
public void start() {
@ -584,10 +575,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
replicationState.forceFinish();
return;
}
if (replicationType == ReplicationType.ASYNC) {
replicationState.forceFinish();
}
IndexMetaData indexMetaData = observer.observedState().metaData().index(internalRequest.concreteIndex());
if (newPrimaryShard != null) {
performOnReplica(replicationState, newPrimaryShard, newPrimaryShard.currentNodeId(), indexMetaData);
@ -853,7 +840,6 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
new ActionWriteResponse.ShardInfo(
numberOfShardInstances,
success.get(),
pending.get(),
failuresArray
)

View File

@ -126,7 +126,7 @@ public class UpdateHelper extends AbstractComponent {
.routing(request.routing())
.ttl(ttl)
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
.consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
if (request.versionType() != VersionType.INTERNAL) {
// in all but the internal versioning mode, we want to create the new document using the given version.
@ -228,7 +228,7 @@ public class UpdateHelper extends AbstractComponent {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.refresh(request.refresh());
indexRequest.operationThreaded(false);
@ -236,7 +236,7 @@ public class UpdateHelper extends AbstractComponent {
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(updateVersion).versionType(request.versionType())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
.consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -73,7 +72,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
private boolean refresh = false;
private ReplicationType replicationType = ReplicationType.DEFAULT;
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
private IndexRequest upsertRequest;
@ -358,21 +356,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return this.refresh;
}
/**
* The replication type.
*/
public ReplicationType replicationType() {
return this.replicationType;
}
/**
* Sets the replication type.
*/
public UpdateRequest replicationType(ReplicationType replicationType) {
this.replicationType = replicationType;
return this;
}
public WriteConsistencyLevel consistencyLevel() {
return this.consistencyLevel;
}
@ -633,7 +616,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
replicationType = ReplicationType.fromId(in.readByte());
consistencyLevel = WriteConsistencyLevel.fromId(in.readByte());
type = in.readString();
id = in.readString();
@ -671,7 +653,6 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(replicationType.id());
out.writeByte(consistencyLevel.id());
out.writeString(type);
out.writeString(id);

View File

@ -22,7 +22,6 @@ package org.elasticsearch.action.update;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.bytes.BytesReference;
@ -158,14 +157,6 @@ public class UpdateRequestBuilder extends InstanceShardOperationRequestBuilder<U
return this;
}
/**
* Sets the replication type.
*/
public UpdateRequestBuilder setReplicationType(ReplicationType replicationType) {
request.replicationType(replicationType);
return this;
}
/**
* Sets the consistency level of write. Defaults to {@link org.elasticsearch.action.WriteConsistencyLevel#DEFAULT}
*/

View File

@ -45,7 +45,7 @@ public class UpdateResponse extends ActionWriteResponse {
* For example: update script with operation set to none
*/
public UpdateResponse(String index, String type, String id, long version, boolean created) {
this(new ShardInfo(0, 0, 0), index, type, id, version, created);
this(new ShardInfo(0, 0), index, type, id, version, created);
}
public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) {

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
@ -77,10 +76,6 @@ public class RestBulkAction extends BaseRestHandler {
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String replicationType = request.param("replication");
if (replicationType != null) {
bulkRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
bulkRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));

View File

@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -62,10 +61,6 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.version(RestActions.parseVersion(request));
deleteRequest.versionType(VersionType.fromString(request.param("version_type"), deleteRequest.versionType()));
String replicationType = request.param("replication");
if (replicationType != null) {
deleteRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.ShardDeleteByQueryRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
@ -72,10 +71,6 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
deleteByQueryRequest.timeout(request.paramAsTime("timeout", ShardDeleteByQueryRequest.DEFAULT_TIMEOUT));
deleteByQueryRequest.routing(request.param("routing"));
String replicationType = request.param("replication");
if (replicationType != null) {
deleteByQueryRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
deleteByQueryRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -99,10 +98,6 @@ public class RestIndexAction extends BaseRestHandler {
}
}
}
String replicationType = request.param("replication");
if (replicationType != null) {
indexRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));

View File

@ -22,7 +22,6 @@ package org.elasticsearch.rest.action.update;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
@ -60,10 +59,6 @@ public class RestUpdateAction extends BaseRestHandler {
updateRequest.routing(request.param("routing"));
updateRequest.timeout(request.paramAsTime("timeout", updateRequest.timeout()));
updateRequest.refresh(request.paramAsBoolean("refresh", updateRequest.refresh()));
String replicationType = request.param("replication");
if (replicationType != null) {
updateRequest.replicationType(ReplicationType.fromString(replicationType));
}
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
updateRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));

View File

@ -194,7 +194,6 @@ public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
assertThat(shardInfo.getTotal(), greaterThanOrEqualTo(numShards.totalNumShards));
// we do not ensure green so just make sure request succeeded at least on all primaries
assertThat(shardInfo.getSuccessful(), greaterThanOrEqualTo(numShards.numPrimaries));
assertThat(shardInfo.getPending(), equalTo(0));
assertThat(shardInfo.getFailed(), equalTo(0));
for (ActionWriteResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
assertThat(failure.status(), equalTo(RestStatus.OK));

View File

@ -33,7 +33,6 @@ import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -124,7 +123,7 @@ public class DocumentActionsTests extends ElasticsearchIntegrationTest {
}
logger.info("Delete [type1/1]");
DeleteResponse deleteResponse = client().prepareDelete("test", "type1", "1").setReplicationType(ReplicationType.SYNC).execute().actionGet();
DeleteResponse deleteResponse = client().prepareDelete("test", "type1", "1").execute().actionGet();
assertThat(deleteResponse.getIndex(), equalTo(getConcreteIndexName()));
assertThat(deleteResponse.getId(), equalTo("1"));
assertThat(deleteResponse.getType(), equalTo("type1"));

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -106,7 +105,7 @@ public class ShardInfoTests extends ElasticsearchIntegrationTest {
int numPrimaryShards = randomIntBetween(1, 2);
prepareIndex(numPrimaryShards, true);
DeleteResponse deleteResponse = client().prepareDelete("idx", "type", "1").get();
assertShardInfo(deleteResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0);
assertShardInfo(deleteResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards);
}
@Test
@ -116,18 +115,9 @@ public class ShardInfoTests extends ElasticsearchIntegrationTest {
IndexDeleteByQueryResponse indexDeleteByQueryResponse = client().prepareDeleteByQuery("idx")
.setQuery(QueryBuilders.matchAllQuery())
.get().getIndex("idx");
assertShardInfo(indexDeleteByQueryResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards, 0);
assertShardInfo(indexDeleteByQueryResponse, numCopies * numPrimaryShards, numNodes * numPrimaryShards);
}
@Test
public void testIndexWithAsyncReplication() throws Exception {
prepareIndex(1);
IndexResponse indexResponse = client().prepareIndex("idx", "type")
.setReplicationType(ReplicationType.ASYNC)
.setSource("{}")
.get();
assertShardInfo(indexResponse, numCopies, 1, numNodes - 1);
}
private void prepareIndex(int numberOfPrimaryShards) throws Exception {
prepareIndex(numberOfPrimaryShards, false);
@ -152,13 +142,12 @@ public class ShardInfoTests extends ElasticsearchIntegrationTest {
}
private void assertShardInfo(ActionWriteResponse response) {
assertShardInfo(response, numCopies, numNodes, 0);
assertShardInfo(response, numCopies, numNodes);
}
private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful, int expectedPending) {
private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful) {
assertThat(response.getShardInfo().getTotal(), greaterThanOrEqualTo(expectedTotal));
assertThat(response.getShardInfo().getSuccessful(), greaterThanOrEqualTo(expectedSuccessful));
assertThat(response.getShardInfo().getPending(), equalTo(expectedPending));
}
private void ensureActiveShardCopies(final int shardId, final int copyCount) throws Exception {