Introduce a common base response class to all single doc write ops
IndexResponse, DeleteResponse and UpdateResponse share some logic. This can be unified to a single DocWriteResponse base class. On top, some replication actions are now not about write operations anymore. This commit renames ActionWriteResponse to ReplicationResponse Last some toXContent is moved from the Rest layer to the actual response classes, for more code re-sharing. Closes #15334
This commit is contained in:
parent
fab44398d9
commit
fafeb3abdd
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A base class for the response of a write operation that involves a single doc
|
||||
*/
|
||||
public abstract class DocWriteResponse extends ReplicationResponse implements StatusToXContent {
|
||||
|
||||
private ShardId shardId;
|
||||
private String id;
|
||||
private String type;
|
||||
private long version;
|
||||
|
||||
public DocWriteResponse(ShardId shardId, String type, String id, long version) {
|
||||
this.shardId = shardId;
|
||||
this.type = type;
|
||||
this.id = id;
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
// needed for deserialization
|
||||
protected DocWriteResponse() {
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the document was changed in.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return this.shardId.getIndex();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* The exact shard the document was changed in.
|
||||
*/
|
||||
public ShardId getShardId() {
|
||||
return this.shardId;
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of the document changed.
|
||||
*/
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
* The id of the document changed.
|
||||
*/
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current version of the doc.
|
||||
*/
|
||||
public long getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
/** returns the rest status for this response (based on {@link ShardInfo#status()} */
|
||||
public RestStatus status() {
|
||||
return getShardInfo().status();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
shardId = ShardId.readShardId(in);
|
||||
type = in.readString();
|
||||
id = in.readString();
|
||||
version = in.readZLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
shardId.writeTo(out);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
out.writeZLong(version);
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
ReplicationResponse.ShardInfo shardInfo = getShardInfo();
|
||||
builder.field(Fields._INDEX, shardId.getIndex())
|
||||
.field(Fields._TYPE, type)
|
||||
.field(Fields._ID, id)
|
||||
.field(Fields._VERSION, version);
|
||||
shardInfo.toXContent(builder, params);
|
||||
return builder;
|
||||
}
|
||||
}
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.action;
|
|||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.bootstrap.Elasticsearch;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -30,25 +29,23 @@ import org.elasticsearch.common.io.stream.Streamable;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Base class for write action responses.
|
||||
*/
|
||||
public class ActionWriteResponse extends ActionResponse {
|
||||
public class ReplicationResponse extends ActionResponse {
|
||||
|
||||
public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];
|
||||
public final static ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0];
|
||||
|
||||
private ShardInfo shardInfo;
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in);
|
||||
shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in);
|
||||
}
|
||||
|
||||
@Override
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
|
||||
|
@ -36,7 +36,7 @@ import java.util.List;
|
|||
/**
|
||||
* Flush Action.
|
||||
*/
|
||||
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ActionWriteResponse> {
|
||||
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ReplicationResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
|
@ -47,8 +47,8 @@ public class TransportFlushAction extends TransportBroadcastReplicationAction<Fl
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ActionWriteResponse newShardResponse() {
|
||||
return new ActionWriteResponse();
|
||||
protected ReplicationResponse newShardResponse() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.flush;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
|
@ -39,7 +39,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ActionWriteResponse> {
|
||||
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {
|
||||
|
||||
public static final String NAME = FlushAction.NAME + "[s]";
|
||||
|
||||
|
@ -53,16 +53,16 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ActionWriteResponse newResponseInstance() {
|
||||
return new ActionWriteResponse();
|
||||
protected ReplicationResponse newResponseInstance() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
|
||||
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
|
||||
indexShard.flush(shardRequest.getRequest());
|
||||
logger.trace("{} flush request executed on primary", indexShard.shardId());
|
||||
return new Tuple<>(new ActionWriteResponse(), shardRequest);
|
||||
return new Tuple<>(new ReplicationResponse(), shardRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
|
@ -37,7 +37,7 @@ import java.util.List;
|
|||
/**
|
||||
* Refresh action.
|
||||
*/
|
||||
public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest, RefreshResponse, ReplicationRequest, ActionWriteResponse> {
|
||||
public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest, RefreshResponse, ReplicationRequest, ReplicationResponse> {
|
||||
|
||||
@Inject
|
||||
public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
|
@ -48,8 +48,8 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ActionWriteResponse newShardResponse() {
|
||||
return new ActionWriteResponse();
|
||||
protected ReplicationResponse newShardResponse() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.admin.indices.refresh;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
|
@ -41,7 +41,7 @@ import org.elasticsearch.transport.TransportService;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class TransportShardRefreshAction extends TransportReplicationAction<ReplicationRequest, ReplicationRequest, ActionWriteResponse> {
|
||||
public class TransportShardRefreshAction extends TransportReplicationAction<ReplicationRequest, ReplicationRequest, ReplicationResponse> {
|
||||
|
||||
public static final String NAME = RefreshAction.NAME + "[s]";
|
||||
|
||||
|
@ -55,16 +55,16 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Repl
|
|||
}
|
||||
|
||||
@Override
|
||||
protected ActionWriteResponse newResponseInstance() {
|
||||
return new ActionWriteResponse();
|
||||
protected ReplicationResponse newResponseInstance() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
|
||||
protected Tuple<ReplicationResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
|
||||
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
|
||||
indexShard.refresh("api");
|
||||
logger.trace("{} refresh request executed on primary", indexShard.shardId());
|
||||
return new Tuple<>(new ActionWriteResponse(), shardRequest);
|
||||
return new Tuple<>(new ReplicationResponse(), shardRequest);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,14 +19,18 @@
|
|||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.xcontent.StatusToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -35,7 +39,39 @@ import java.io.IOException;
|
|||
* Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
|
||||
* of the relevant action, and if it has failed or not (with the failure message incase it failed).
|
||||
*/
|
||||
public class BulkItemResponse implements Streamable {
|
||||
public class BulkItemResponse implements Streamable, StatusToXContent {
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
return failure == null ? response.status() : failure.getStatus();
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(opType);
|
||||
if (failure == null) {
|
||||
response.toXContent(builder, params);
|
||||
builder.field(Fields.STATUS, response.status());
|
||||
} else {
|
||||
builder.field(Fields._INDEX, failure.getIndex());
|
||||
builder.field(Fields._TYPE, failure.getType());
|
||||
builder.field(Fields._ID, failure.getId());
|
||||
builder.field(Fields.STATUS, failure.getStatus());
|
||||
builder.startObject(Fields.ERROR);
|
||||
ElasticsearchException.toXContent(builder, params, failure.getCause());
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString STATUS = new XContentBuilderString("status");
|
||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a failure.
|
||||
|
@ -99,7 +135,7 @@ public class BulkItemResponse implements Streamable {
|
|||
|
||||
private String opType;
|
||||
|
||||
private ActionWriteResponse response;
|
||||
private DocWriteResponse response;
|
||||
|
||||
private Failure failure;
|
||||
|
||||
|
@ -107,7 +143,7 @@ public class BulkItemResponse implements Streamable {
|
|||
|
||||
}
|
||||
|
||||
public BulkItemResponse(int id, String opType, ActionWriteResponse response) {
|
||||
public BulkItemResponse(int id, String opType, DocWriteResponse response) {
|
||||
this.id = id;
|
||||
this.opType = opType;
|
||||
this.response = response;
|
||||
|
@ -140,14 +176,7 @@ public class BulkItemResponse implements Streamable {
|
|||
if (failure != null) {
|
||||
return failure.getIndex();
|
||||
}
|
||||
if (response instanceof IndexResponse) {
|
||||
return ((IndexResponse) response).getIndex();
|
||||
} else if (response instanceof DeleteResponse) {
|
||||
return ((DeleteResponse) response).getIndex();
|
||||
} else if (response instanceof UpdateResponse) {
|
||||
return ((UpdateResponse) response).getIndex();
|
||||
}
|
||||
return null;
|
||||
return response.getIndex();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,14 +186,7 @@ public class BulkItemResponse implements Streamable {
|
|||
if (failure != null) {
|
||||
return failure.getType();
|
||||
}
|
||||
if (response instanceof IndexResponse) {
|
||||
return ((IndexResponse) response).getType();
|
||||
} else if (response instanceof DeleteResponse) {
|
||||
return ((DeleteResponse) response).getType();
|
||||
} else if (response instanceof UpdateResponse) {
|
||||
return ((UpdateResponse) response).getType();
|
||||
}
|
||||
return null;
|
||||
return response.getType();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -174,14 +196,7 @@ public class BulkItemResponse implements Streamable {
|
|||
if (failure != null) {
|
||||
return failure.getId();
|
||||
}
|
||||
if (response instanceof IndexResponse) {
|
||||
return ((IndexResponse) response).getId();
|
||||
} else if (response instanceof DeleteResponse) {
|
||||
return ((DeleteResponse) response).getId();
|
||||
} else if (response instanceof UpdateResponse) {
|
||||
return ((UpdateResponse) response).getId();
|
||||
}
|
||||
return null;
|
||||
return response.getId();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -191,21 +206,14 @@ public class BulkItemResponse implements Streamable {
|
|||
if (failure != null) {
|
||||
return -1;
|
||||
}
|
||||
if (response instanceof IndexResponse) {
|
||||
return ((IndexResponse) response).getVersion();
|
||||
} else if (response instanceof DeleteResponse) {
|
||||
return ((DeleteResponse) response).getVersion();
|
||||
} else if (response instanceof UpdateResponse) {
|
||||
return ((UpdateResponse) response).getVersion();
|
||||
}
|
||||
return -1;
|
||||
return response.getVersion();
|
||||
}
|
||||
|
||||
/**
|
||||
* The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
|
||||
* case of failure.
|
||||
*/
|
||||
public <T extends ActionWriteResponse> T getResponse() {
|
||||
public <T extends DocWriteResponse> T getResponse() {
|
||||
return (T) response;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.action.bulk;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
|
@ -29,7 +29,7 @@ import java.io.IOException;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class BulkShardResponse extends ActionWriteResponse {
|
||||
public class BulkShardResponse extends ReplicationResponse {
|
||||
|
||||
private ShardId shardId;
|
||||
private BulkItemResponse[] responses;
|
||||
|
|
|
@ -204,7 +204,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
|
|||
BytesReference indexSourceAsBytes = indexRequest.source();
|
||||
// add the response
|
||||
IndexResponse indexResponse = result.response();
|
||||
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
|
||||
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
|
||||
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
|
||||
|
@ -216,7 +216,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
|
|||
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
|
||||
DeleteResponse response = writeResult.response();
|
||||
DeleteRequest deleteRequest = updateResult.request();
|
||||
updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
|
||||
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
|
||||
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
|
||||
// Replace the update request to the translated delete request to execute on the replica.
|
||||
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
|
||||
|
|
|
@ -19,9 +19,13 @@
|
|||
|
||||
package org.elasticsearch.action.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -31,53 +35,19 @@ import java.io.IOException;
|
|||
* @see org.elasticsearch.action.delete.DeleteRequest
|
||||
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
|
||||
*/
|
||||
public class DeleteResponse extends ActionWriteResponse {
|
||||
public class DeleteResponse extends DocWriteResponse {
|
||||
|
||||
private String index;
|
||||
private String id;
|
||||
private String type;
|
||||
private long version;
|
||||
private boolean found;
|
||||
|
||||
public DeleteResponse() {
|
||||
|
||||
}
|
||||
|
||||
public DeleteResponse(String index, String type, String id, long version, boolean found) {
|
||||
this.index = index;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.version = version;
|
||||
public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) {
|
||||
super(shardId, type, id, version);
|
||||
this.found = found;
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the document was deleted from.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of the document deleted.
|
||||
*/
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
* The id of the document deleted.
|
||||
*/
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
/**
|
||||
* The version of the delete operation.
|
||||
*/
|
||||
public long getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <tt>true</tt> if a doc was found to delete.
|
||||
|
@ -89,20 +59,44 @@ public class DeleteResponse extends ActionWriteResponse {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
type = in.readString();
|
||||
id = in.readString();
|
||||
version = in.readLong();
|
||||
found = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(found);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (found == false) {
|
||||
return RestStatus.NOT_FOUND;
|
||||
}
|
||||
return super.status();
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("found");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.FOUND, isFound());
|
||||
super.toXContent(builder, params);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("DeleteResponse[");
|
||||
builder.append("index=").append(getIndex());
|
||||
builder.append(",type=").append(getType());
|
||||
builder.append(",id=").append(getId());
|
||||
builder.append(",version=").append(getVersion());
|
||||
builder.append(",found=").append(found);
|
||||
builder.append(",shards=").append(getShardInfo());
|
||||
return builder.append("]").toString();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
|
|||
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
return new WriteResult<>(
|
||||
new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()),
|
||||
new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found()),
|
||||
delete.getTranslogLocation());
|
||||
}
|
||||
|
||||
|
|
|
@ -19,9 +19,13 @@
|
|||
|
||||
package org.elasticsearch.action.index;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -31,54 +35,19 @@ import java.io.IOException;
|
|||
* @see org.elasticsearch.action.index.IndexRequest
|
||||
* @see org.elasticsearch.client.Client#index(IndexRequest)
|
||||
*/
|
||||
public class IndexResponse extends ActionWriteResponse {
|
||||
public class IndexResponse extends DocWriteResponse {
|
||||
|
||||
private String index;
|
||||
private String id;
|
||||
private String type;
|
||||
private long version;
|
||||
private boolean created;
|
||||
|
||||
public IndexResponse() {
|
||||
|
||||
}
|
||||
|
||||
public IndexResponse(String index, String type, String id, long version, boolean created) {
|
||||
this.index = index;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.version = version;
|
||||
public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) {
|
||||
super(shardId, type, id, version);
|
||||
this.created = created;
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the document was indexed into.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of the document indexed.
|
||||
*/
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
* The id of the document indexed.
|
||||
*/
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current version of the doc indexed.
|
||||
*/
|
||||
public long getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the document was created, false if updated.
|
||||
*/
|
||||
|
@ -86,23 +55,23 @@ public class IndexResponse extends ActionWriteResponse {
|
|||
return this.created;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (created) {
|
||||
return RestStatus.CREATED;
|
||||
}
|
||||
return super.status();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
type = in.readString();
|
||||
id = in.readString();
|
||||
version = in.readLong();
|
||||
created = in.readBoolean();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(created);
|
||||
}
|
||||
|
||||
|
@ -110,12 +79,23 @@ public class IndexResponse extends ActionWriteResponse {
|
|||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("IndexResponse[");
|
||||
builder.append("index=").append(index);
|
||||
builder.append(",type=").append(type);
|
||||
builder.append(",id=").append(id);
|
||||
builder.append(",version=").append(version);
|
||||
builder.append("index=").append(getIndex());
|
||||
builder.append(",type=").append(getType());
|
||||
builder.append(",id=").append(getId());
|
||||
builder.append(",version=").append(getVersion());
|
||||
builder.append(",created=").append(created);
|
||||
builder.append(",shards=").append(getShardInfo());
|
||||
return builder.append("]").toString();
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString CREATED = new XContentBuilderString("created");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.toXContent(builder, params);
|
||||
builder.field(Fields.CREATED, isCreated());
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
|
||||
assert request.versionType().validateVersionForWrites(request.version());
|
||||
|
||||
return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
|
||||
return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,9 +22,8 @@ package org.elasticsearch.action.support.replication;
|
|||
import com.carrotsearch.hppc.cursors.IntObjectCursor;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
|
@ -53,7 +52,7 @@ import java.util.function.Supplier;
|
|||
* Base class for requests that should be executed on all shards of an index or several indices.
|
||||
* This action sends shard requests to all primary shards of the indices and they are then replicated like write requests
|
||||
*/
|
||||
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, ShardResponse extends ActionWriteResponse> extends HandledTransportAction<Request, Response> {
|
||||
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final TransportReplicationAction replicatedBroadcastShardAction;
|
||||
private final ClusterService clusterService;
|
||||
|
@ -91,15 +90,15 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
logger.trace("{}: got failure from {}", actionName, shardId);
|
||||
int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
|
||||
ShardResponse shardResponse = newShardResponse();
|
||||
ActionWriteResponse.ShardInfo.Failure[] failures;
|
||||
ReplicationResponse.ShardInfo.Failure[] failures;
|
||||
if (TransportActions.isShardNotAvailableException(e)) {
|
||||
failures = new ActionWriteResponse.ShardInfo.Failure[0];
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[0];
|
||||
} else {
|
||||
ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
|
||||
failures = new ActionWriteResponse.ShardInfo.Failure[totalNumCopies];
|
||||
ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[totalNumCopies];
|
||||
Arrays.fill(failures, failure);
|
||||
}
|
||||
shardResponse.setShardInfo(new ActionWriteResponse.ShardInfo(totalNumCopies, 0, failures));
|
||||
shardResponse.setShardInfo(new ReplicationResponse.ShardInfo(totalNumCopies, 0, failures));
|
||||
shardsResponses.add(shardResponse);
|
||||
if (responsesCountDown.countDown()) {
|
||||
finishAndNotifyListener(listener, shardsResponses);
|
||||
|
@ -142,7 +141,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
int totalNumCopies = 0;
|
||||
List<ShardOperationFailedException> shardFailures = null;
|
||||
for (int i = 0; i < shardsResponses.size(); i++) {
|
||||
ActionWriteResponse shardResponse = shardsResponses.get(i);
|
||||
ReplicationResponse shardResponse = shardsResponses.get(i);
|
||||
if (shardResponse == null) {
|
||||
// non active shard, ignore
|
||||
} else {
|
||||
|
@ -152,7 +151,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
if (shardFailures == null) {
|
||||
shardFailures = new ArrayList<>();
|
||||
}
|
||||
for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
|
||||
for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
|
||||
shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause())));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
|
@ -78,7 +78,7 @@ import java.util.function.Supplier;
|
|||
* primary node to validate request before primary operation followed by sampling state again for resolving
|
||||
* nodes with replica copies to perform replication.
|
||||
*/
|
||||
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
|
||||
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction<Request, Response> {
|
||||
|
||||
public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";
|
||||
|
||||
|
@ -214,7 +214,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
return false;
|
||||
}
|
||||
|
||||
protected static class WriteResult<T extends ActionWriteResponse> {
|
||||
protected static class WriteResult<T extends ReplicationResponse> {
|
||||
|
||||
public final T response;
|
||||
public final Translog.Location location;
|
||||
|
@ -225,10 +225,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends ActionWriteResponse> T response() {
|
||||
public <T extends ReplicationResponse> T response() {
|
||||
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
|
||||
// request and not use it
|
||||
response.setShardInfo(new ActionWriteResponse.ShardInfo());
|
||||
response.setShardInfo(new ReplicationResponse.ShardInfo());
|
||||
return (T) response;
|
||||
}
|
||||
|
||||
|
@ -908,20 +908,20 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private void doFinish() {
|
||||
if (finished.compareAndSet(false, true)) {
|
||||
Releasables.close(indexShardReference);
|
||||
final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
|
||||
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
|
||||
if (!shardReplicaFailures.isEmpty()) {
|
||||
int slot = 0;
|
||||
failuresArray = new ActionWriteResponse.ShardInfo.Failure[shardReplicaFailures.size()];
|
||||
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
|
||||
for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
|
||||
RestStatus restStatus = ExceptionsHelper.status(entry.getValue());
|
||||
failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure(
|
||||
failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(
|
||||
shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false
|
||||
);
|
||||
}
|
||||
} else {
|
||||
failuresArray = ActionWriteResponse.EMPTY;
|
||||
failuresArray = ReplicationResponse.EMPTY;
|
||||
}
|
||||
finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(
|
||||
finalResponse.setShardInfo(new ReplicationResponse.ShardInfo(
|
||||
totalShards,
|
||||
success.get(),
|
||||
failuresArray
|
||||
|
|
|
@ -175,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|||
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
|
||||
if (request.fields() != null && request.fields().length > 0) {
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
|
||||
|
@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|||
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
|
||||
listener.onResponse(update);
|
||||
}
|
||||
|
@ -240,7 +240,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
|||
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
|
||||
@Override
|
||||
public void onResponse(DeleteResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
|
||||
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
|
||||
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
|
||||
listener.onResponse(update);
|
||||
}
|
||||
|
|
|
@ -83,9 +83,10 @@ public class UpdateHelper extends AbstractComponent {
|
|||
@SuppressWarnings("unchecked")
|
||||
protected Result prepare(UpdateRequest request, final GetResult getResult) {
|
||||
long getDateNS = System.nanoTime();
|
||||
final ShardId shardId = new ShardId(getResult.getIndex(), request.shardId());
|
||||
if (!getResult.isExists()) {
|
||||
if (request.upsertRequest() == null && !request.docAsUpsert()) {
|
||||
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
|
||||
throw new DocumentMissingException(shardId, request.type(), request.id());
|
||||
}
|
||||
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
|
||||
TimeValue ttl = indexRequest.ttl();
|
||||
|
@ -113,7 +114,7 @@ public class UpdateHelper extends AbstractComponent {
|
|||
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice,
|
||||
request.script.getScript());
|
||||
}
|
||||
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
|
||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
|
||||
getResult.getVersion(), false);
|
||||
update.setGetResult(getResult);
|
||||
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
|
||||
|
@ -145,7 +146,7 @@ public class UpdateHelper extends AbstractComponent {
|
|||
|
||||
if (getResult.internalSourceRef() == null) {
|
||||
// no source, we can't do nothing, through a failure...
|
||||
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
|
||||
throw new DocumentSourceMissingException(shardId, request.type(), request.id());
|
||||
}
|
||||
|
||||
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
|
||||
|
@ -231,12 +232,12 @@ public class UpdateHelper extends AbstractComponent {
|
|||
.consistencyLevel(request.consistencyLevel());
|
||||
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
|
||||
} else if ("none".equals(operation)) {
|
||||
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
||||
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
|
||||
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
|
||||
} else {
|
||||
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript());
|
||||
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
||||
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
|
||||
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,21 +19,21 @@
|
|||
|
||||
package org.elasticsearch.action.update;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.get.GetResult;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class UpdateResponse extends ActionWriteResponse {
|
||||
public class UpdateResponse extends DocWriteResponse {
|
||||
|
||||
private String index;
|
||||
private String id;
|
||||
private String type;
|
||||
private long version;
|
||||
private boolean created;
|
||||
private GetResult getResult;
|
||||
|
||||
|
@ -44,47 +44,16 @@ public class UpdateResponse extends ActionWriteResponse {
|
|||
* Constructor to be used when a update didn't translate in a write.
|
||||
* For example: update script with operation set to none
|
||||
*/
|
||||
public UpdateResponse(String index, String type, String id, long version, boolean created) {
|
||||
this(new ShardInfo(0, 0), index, type, id, version, created);
|
||||
public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) {
|
||||
this(new ShardInfo(0, 0), shardId, type, id, version, created);
|
||||
}
|
||||
|
||||
public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) {
|
||||
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long version, boolean created) {
|
||||
super(shardId, type, id, version);
|
||||
setShardInfo(shardInfo);
|
||||
this.index = index;
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.version = version;
|
||||
this.created = created;
|
||||
}
|
||||
|
||||
/**
|
||||
* The index the document was indexed into.
|
||||
*/
|
||||
public String getIndex() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
/**
|
||||
* The type of the document indexed.
|
||||
*/
|
||||
public String getType() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
/**
|
||||
* The id of the document indexed.
|
||||
*/
|
||||
public String getId() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current version of the doc indexed.
|
||||
*/
|
||||
public long getVersion() {
|
||||
return this.version;
|
||||
}
|
||||
|
||||
public void setGetResult(GetResult getResult) {
|
||||
this.getResult = getResult;
|
||||
}
|
||||
|
@ -101,13 +70,17 @@ public class UpdateResponse extends ActionWriteResponse {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestStatus status() {
|
||||
if (created) {
|
||||
return RestStatus.CREATED;
|
||||
}
|
||||
return super.status();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
type = in.readString();
|
||||
id = in.readString();
|
||||
version = in.readLong();
|
||||
created = in.readBoolean();
|
||||
if (in.readBoolean()) {
|
||||
getResult = GetResult.readGetResult(in);
|
||||
|
@ -117,10 +90,6 @@ public class UpdateResponse extends ActionWriteResponse {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeString(type);
|
||||
out.writeString(id);
|
||||
out.writeLong(version);
|
||||
out.writeBoolean(created);
|
||||
if (getResult == null) {
|
||||
out.writeBoolean(false);
|
||||
|
@ -129,4 +98,34 @@ public class UpdateResponse extends ActionWriteResponse {
|
|||
getResult.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString GET = new XContentBuilderString("get");
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
super.toXContent(builder, params);
|
||||
if (getGetResult() != null) {
|
||||
builder.startObject(Fields.GET);
|
||||
getGetResult().toXContentEmbedded(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StringBuilder builder = new StringBuilder();
|
||||
builder.append("UpdateResponse[");
|
||||
builder.append("index=").append(getIndex());
|
||||
builder.append(",type=").append(getType());
|
||||
builder.append(",id=").append(getId());
|
||||
builder.append(",version=").append(getVersion());
|
||||
builder.append(",created=").append(created);
|
||||
builder.append(",shards=").append(getShardInfo());
|
||||
return builder.append("]").toString();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,16 +19,11 @@
|
|||
|
||||
package org.elasticsearch.rest.action.bulk;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.bulk.BulkShardRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -96,52 +91,7 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
builder.startArray(Fields.ITEMS);
|
||||
for (BulkItemResponse itemResponse : response) {
|
||||
builder.startObject();
|
||||
builder.startObject(itemResponse.getOpType());
|
||||
builder.field(Fields._INDEX, itemResponse.getIndex());
|
||||
builder.field(Fields._TYPE, itemResponse.getType());
|
||||
builder.field(Fields._ID, itemResponse.getId());
|
||||
long version = itemResponse.getVersion();
|
||||
if (version != -1) {
|
||||
builder.field(Fields._VERSION, itemResponse.getVersion());
|
||||
}
|
||||
if (itemResponse.isFailed()) {
|
||||
builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus());
|
||||
builder.startObject(Fields.ERROR);
|
||||
ElasticsearchException.toXContent(builder, request, itemResponse.getFailure().getCause());
|
||||
builder.endObject();
|
||||
} else {
|
||||
ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo();
|
||||
shardInfo.toXContent(builder, request);
|
||||
if (itemResponse.getResponse() instanceof DeleteResponse) {
|
||||
DeleteResponse deleteResponse = itemResponse.getResponse();
|
||||
if (deleteResponse.isFound()) {
|
||||
builder.field(Fields.STATUS, shardInfo.status().getStatus());
|
||||
} else {
|
||||
builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus());
|
||||
}
|
||||
builder.field(Fields.FOUND, deleteResponse.isFound());
|
||||
} else if (itemResponse.getResponse() instanceof IndexResponse) {
|
||||
IndexResponse indexResponse = itemResponse.getResponse();
|
||||
if (indexResponse.isCreated()) {
|
||||
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
|
||||
} else {
|
||||
builder.field(Fields.STATUS, shardInfo.status().getStatus());
|
||||
}
|
||||
} else if (itemResponse.getResponse() instanceof UpdateResponse) {
|
||||
UpdateResponse updateResponse = itemResponse.getResponse();
|
||||
if (updateResponse.isCreated()) {
|
||||
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
|
||||
} else {
|
||||
builder.field(Fields.STATUS, shardInfo.status().getStatus());
|
||||
}
|
||||
if (updateResponse.getGetResult() != null) {
|
||||
builder.startObject(Fields.GET);
|
||||
updateResponse.getGetResult().toXContentEmbedded(builder, request);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
itemResponse.toXContent(builder, request);
|
||||
builder.endObject();
|
||||
}
|
||||
builder.endArray();
|
||||
|
@ -155,15 +105,7 @@ public class RestBulkAction extends BaseRestHandler {
|
|||
static final class Fields {
|
||||
static final XContentBuilderString ITEMS = new XContentBuilderString("items");
|
||||
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString STATUS = new XContentBuilderString("status");
|
||||
static final XContentBuilderString ERROR = new XContentBuilderString("error");
|
||||
static final XContentBuilderString TOOK = new XContentBuilderString("took");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("found");
|
||||
static final XContentBuilderString GET = new XContentBuilderString("get");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.rest.action.delete;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
|
@ -27,14 +26,13 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
|
||||
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -62,31 +60,6 @@ public class RestDeleteAction extends BaseRestHandler {
|
|||
deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
|
||||
}
|
||||
|
||||
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
|
||||
ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo();
|
||||
builder.startObject().field(Fields.FOUND, result.isFound())
|
||||
.field(Fields._INDEX, result.getIndex())
|
||||
.field(Fields._TYPE, result.getType())
|
||||
.field(Fields._ID, result.getId())
|
||||
.field(Fields._VERSION, result.getVersion())
|
||||
.value(shardInfo)
|
||||
.endObject();
|
||||
RestStatus status = shardInfo.status();
|
||||
if (!result.isFound()) {
|
||||
status = NOT_FOUND;
|
||||
}
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString FOUND = new XContentBuilderString("found");
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.rest.action.index;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
|
@ -27,11 +26,11 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -99,33 +98,6 @@ public class RestIndexAction extends BaseRestHandler {
|
|||
if (consistencyLevel != null) {
|
||||
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
|
||||
}
|
||||
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
|
||||
builder.startObject();
|
||||
ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
|
||||
builder.field(Fields._INDEX, response.getIndex())
|
||||
.field(Fields._TYPE, response.getType())
|
||||
.field(Fields._ID, response.getId())
|
||||
.field(Fields._VERSION, response.getVersion());
|
||||
shardInfo.toXContent(builder, request);
|
||||
builder.field(Fields.CREATED, response.isCreated());
|
||||
builder.endObject();
|
||||
RestStatus status = shardInfo.status();
|
||||
if (response.isCreated()) {
|
||||
status = CREATED;
|
||||
client.index(indexRequest, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
static final XContentBuilderString CREATED = new XContentBuilderString("created");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package org.elasticsearch.rest.action.update;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
|
@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.BytesRestResponse;
|
||||
|
@ -40,6 +38,7 @@ import org.elasticsearch.rest.RestResponse;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestBuilderListener;
|
||||
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptParameterParser;
|
||||
import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue;
|
||||
|
@ -48,7 +47,6 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.POST;
|
||||
import static org.elasticsearch.rest.RestStatus.CREATED;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -123,38 +121,6 @@ public class RestUpdateAction extends BaseRestHandler {
|
|||
}
|
||||
}
|
||||
|
||||
client.update(updateRequest, new RestBuilderListener<UpdateResponse>(channel) {
|
||||
@Override
|
||||
public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception {
|
||||
builder.startObject();
|
||||
ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
|
||||
builder.field(Fields._INDEX, response.getIndex())
|
||||
.field(Fields._TYPE, response.getType())
|
||||
.field(Fields._ID, response.getId())
|
||||
.field(Fields._VERSION, response.getVersion());
|
||||
|
||||
shardInfo.toXContent(builder, request);
|
||||
if (response.getGetResult() != null) {
|
||||
builder.startObject(Fields.GET);
|
||||
response.getGetResult().toXContentEmbedded(builder, request);
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
RestStatus status = shardInfo.status();
|
||||
if (response.isCreated()) {
|
||||
status = CREATED;
|
||||
}
|
||||
return new BytesRestResponse(status, builder);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
|
||||
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
|
||||
static final XContentBuilderString _ID = new XContentBuilderString("_id");
|
||||
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
|
||||
static final XContentBuilderString GET = new XContentBuilderString("get");
|
||||
client.update(updateRequest, new RestStatusToXContentListener<>(channel));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication;
|
|||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
|
||||
|
@ -101,7 +101,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
|
||||
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
if (randomBoolean()) {
|
||||
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
|
||||
} else {
|
||||
|
@ -120,10 +120,10 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
ShardRoutingState.STARTED));
|
||||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
|
||||
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
|
||||
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0]));
|
||||
shardRequests.v2().onResponse(actionWriteResponse);
|
||||
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
ReplicationResponse replicationResponse = new ReplicationResponse();
|
||||
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1, new ReplicationResponse.ShardInfo.Failure[0]));
|
||||
shardRequests.v2().onResponse(replicationResponse);
|
||||
}
|
||||
logger.info("total shards: {}, ", response.get().getTotalShards());
|
||||
assertBroadcastResponse(1, 1, 0, response.get(), null);
|
||||
|
@ -137,20 +137,20 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
|
||||
int succeeded = 0;
|
||||
int failed = 0;
|
||||
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
|
||||
if (randomBoolean()) {
|
||||
ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0];
|
||||
ReplicationResponse.ShardInfo.Failure[] failures = new ReplicationResponse.ShardInfo.Failure[0];
|
||||
int shardsSucceeded = randomInt(1) + 1;
|
||||
succeeded += shardsSucceeded;
|
||||
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
|
||||
ReplicationResponse replicationResponse = new ReplicationResponse();
|
||||
if (shardsSucceeded == 1 && randomBoolean()) {
|
||||
//sometimes add failure (no failure means shard unavailable)
|
||||
failures = new ActionWriteResponse.ShardInfo.Failure[1];
|
||||
failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[1];
|
||||
failures[0] = new ReplicationResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
|
||||
failed++;
|
||||
}
|
||||
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures));
|
||||
shardRequests.v2().onResponse(actionWriteResponse);
|
||||
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures));
|
||||
shardRequests.v2().onResponse(replicationResponse);
|
||||
} else {
|
||||
// sometimes fail
|
||||
failed += 2;
|
||||
|
@ -179,16 +179,16 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
assertThat(shards.get(0), equalTo(shardId));
|
||||
}
|
||||
|
||||
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<BroadcastRequest, BroadcastResponse, ReplicationRequest, ActionWriteResponse> {
|
||||
protected final Set<Tuple<ShardId, ActionListener<ActionWriteResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
|
||||
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<BroadcastRequest, BroadcastResponse, ReplicationRequest, ReplicationResponse> {
|
||||
protected final Set<Tuple<ShardId, ActionListener<ReplicationResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
|
||||
super("test-broadcast-replication-action", BroadcastRequest::new, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ActionWriteResponse newShardResponse() {
|
||||
return new ActionWriteResponse();
|
||||
protected ReplicationResponse newShardResponse() {
|
||||
return new ReplicationResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -202,7 +202,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener<ActionWriteResponse> shardActionListener) {
|
||||
protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener<ReplicationResponse> shardActionListener) {
|
||||
capturedShardRequests.add(new Tuple<>(shardId, shardActionListener));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication;
|
|||
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.UnavailableShardsException;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.ActionFilter;
|
||||
|
@ -521,7 +521,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
assertThat(listener.isDone(), equalTo(true));
|
||||
Response response = listener.get();
|
||||
final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
|
||||
final ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
|
||||
assertThat(shardInfo.getFailed(), equalTo(criticalFailures));
|
||||
assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures));
|
||||
assertThat(shardInfo.getSuccessful(), equalTo(successful));
|
||||
|
@ -703,7 +703,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
static class Response extends ActionWriteResponse {
|
||||
static class Response extends ReplicationResponse {
|
||||
}
|
||||
|
||||
class Action extends TransportReplicationAction<Request, Request, Response> {
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
|
||||
package org.elasticsearch.document;
|
||||
|
||||
import org.elasticsearch.action.ActionWriteResponse;
|
||||
import org.elasticsearch.action.ReplicationResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
|
@ -117,11 +117,11 @@ public class ShardInfoIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertShardInfo(ActionWriteResponse response) {
|
||||
private void assertShardInfo(ReplicationResponse response) {
|
||||
assertShardInfo(response, numCopies, numNodes);
|
||||
}
|
||||
|
||||
private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful) {
|
||||
private void assertShardInfo(ReplicationResponse response, int expectedTotal, int expectedSuccessful) {
|
||||
assertThat(response.getShardInfo().getTotal(), greaterThanOrEqualTo(expectedTotal));
|
||||
assertThat(response.getShardInfo().getSuccessful(), greaterThanOrEqualTo(expectedSuccessful));
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.text.StringText;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
|
@ -225,7 +226,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase {
|
|||
} else {
|
||||
deleted++;
|
||||
}
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test", "type", String.valueOf(i), 1, delete));
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test", 0), "type", String.valueOf(i), 1, delete));
|
||||
} else {
|
||||
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed")));
|
||||
failed++;
|
||||
|
@ -281,7 +282,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase {
|
|||
deleted[0] = deleted[0] + 1;
|
||||
deleted[index] = deleted[index] + 1;
|
||||
}
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test-" + index, "type", String.valueOf(i), 1, delete));
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test-" + index, 0), "type", String.valueOf(i), 1, delete));
|
||||
} else {
|
||||
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed")));
|
||||
failed[0] = failed[0] + 1;
|
||||
|
|
Loading…
Reference in New Issue