From fafeb3abddd8e691e966064bfd3131714e20d8a2 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Wed, 9 Dec 2015 12:36:50 +0100 Subject: [PATCH] Introduce a common base response class to all single doc write ops IndexResponse, DeleteResponse and UpdateResponse share some logic. This can be unified to a single DocWriteResponse base class. On top, some replication actions are now not about write operations anymore. This commit renames ActionWriteResponse to ReplicationResponse Last some toXContent is moved from the Rest layer to the actual response classes, for more code re-sharing. Closes #15334 --- .../action/DocWriteResponse.java | 130 ++++++++++++++++++ ...Response.java => ReplicationResponse.java} | 9 +- .../indices/flush/TransportFlushAction.java | 8 +- .../flush/TransportShardFlushAction.java | 12 +- .../refresh/TransportRefreshAction.java | 8 +- .../refresh/TransportShardRefreshAction.java | 12 +- .../action/bulk/BulkItemResponse.java | 82 ++++++----- .../action/bulk/BulkShardResponse.java | 4 +- .../action/bulk/TransportShardBulkAction.java | 4 +- .../action/delete/DeleteResponse.java | 86 ++++++------ .../action/delete/TransportDeleteAction.java | 2 +- .../action/index/IndexResponse.java | 82 +++++------ .../action/index/TransportIndexAction.java | 2 +- .../TransportBroadcastReplicationAction.java | 19 ++- .../TransportReplicationAction.java | 20 +-- .../action/update/TransportUpdateAction.java | 6 +- .../action/update/UpdateHelper.java | 11 +- .../action/update/UpdateResponse.java | 97 +++++++------ .../rest/action/bulk/RestBulkAction.java | 60 +------- .../rest/action/delete/RestDeleteAction.java | 31 +---- .../rest/action/index/RestIndexAction.java | 32 +---- .../rest/action/update/RestUpdateAction.java | 38 +---- .../BroadcastReplicationTests.java | 36 ++--- .../TransportReplicationActionTests.java | 6 +- .../elasticsearch/document/ShardInfoIT.java | 6 +- .../TransportDeleteByQueryActionTests.java | 5 +- 26 files changed, 385 insertions(+), 423 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/action/DocWriteResponse.java rename core/src/main/java/org/elasticsearch/action/{ActionWriteResponse.java => ReplicationResponse.java} (96%) diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java new file mode 100644 index 00000000000..009d3fc47a9 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +/** + * A base class for the response of a write operation that involves a single doc + */ +public abstract class DocWriteResponse extends ReplicationResponse implements StatusToXContent { + + private ShardId shardId; + private String id; + private String type; + private long version; + + public DocWriteResponse(ShardId shardId, String type, String id, long version) { + this.shardId = shardId; + this.type = type; + this.id = id; + this.version = version; + } + + // needed for deserialization + protected DocWriteResponse() { + } + + /** + * The index the document was changed in. + */ + public String getIndex() { + return this.shardId.getIndex(); + } + + + /** + * The exact shard the document was changed in. + */ + public ShardId getShardId() { + return this.shardId; + } + + /** + * The type of the document changed. + */ + public String getType() { + return this.type; + } + + /** + * The id of the document changed. + */ + public String getId() { + return this.id; + } + + /** + * Returns the current version of the doc. + */ + public long getVersion() { + return this.version; + } + + /** returns the rest status for this response (based on {@link ShardInfo#status()} */ + public RestStatus status() { + return getShardInfo().status(); + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + type = in.readString(); + id = in.readString(); + version = in.readZLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(type); + out.writeString(id); + out.writeZLong(version); + } + + static final class Fields { + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + ReplicationResponse.ShardInfo shardInfo = getShardInfo(); + builder.field(Fields._INDEX, shardId.getIndex()) + .field(Fields._TYPE, type) + .field(Fields._ID, id) + .field(Fields._VERSION, version); + shardInfo.toXContent(builder, params); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java b/core/src/main/java/org/elasticsearch/action/ReplicationResponse.java similarity index 96% rename from core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java rename to core/src/main/java/org/elasticsearch/action/ReplicationResponse.java index f4152ac85e4..4e358c8d42a 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/ReplicationResponse.java @@ -21,7 +21,6 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.bootstrap.Elasticsearch; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -30,25 +29,23 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.Collections; /** * Base class for write action responses. */ -public class ActionWriteResponse extends ActionResponse { +public class ReplicationResponse extends ActionResponse { - public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0]; + public final static ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0]; private ShardInfo shardInfo; @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in); + shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 00e03ffdf6e..d2a8f1abcbf 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; @@ -36,7 +36,7 @@ import java.util.List; /** * Flush Action. */ -public class TransportFlushAction extends TransportBroadcastReplicationAction { +public class TransportFlushAction extends TransportBroadcastReplicationAction { @Inject public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -47,8 +47,8 @@ public class TransportFlushAction extends TransportBroadcastReplicationAction { +public class TransportShardFlushAction extends TransportReplicationAction { public static final String NAME = FlushAction.NAME + "[s]"; @@ -53,16 +53,16 @@ public class TransportShardFlushAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", indexShard.shardId()); - return new Tuple<>(new ActionWriteResponse(), shardRequest); + return new Tuple<>(new ReplicationResponse(), shardRequest); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index a5e30abc12d..a76b714b31d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -37,7 +37,7 @@ import java.util.List; /** * Refresh action. */ -public class TransportRefreshAction extends TransportBroadcastReplicationAction { +public class TransportRefreshAction extends TransportBroadcastReplicationAction { @Inject public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -48,8 +48,8 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction< } @Override - protected ActionWriteResponse newShardResponse() { - return new ActionWriteResponse(); + protected ReplicationResponse newShardResponse() { + return new ReplicationResponse(); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 02af4ad89a0..c78977fb362 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -41,7 +41,7 @@ import org.elasticsearch.transport.TransportService; /** * */ -public class TransportShardRefreshAction extends TransportReplicationAction { +public class TransportShardRefreshAction extends TransportReplicationAction { public static final String NAME = RefreshAction.NAME + "[s]"; @@ -55,16 +55,16 @@ public class TransportShardRefreshAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.refresh("api"); logger.trace("{} refresh request executed on primary", indexShard.shardId()); - return new Tuple<>(new ActionWriteResponse(), shardRequest); + return new Tuple<>(new ReplicationResponse(), shardRequest); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 80e86eaaf17..982700016b7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -19,14 +19,18 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -35,7 +39,39 @@ import java.io.IOException; * Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id * of the relevant action, and if it has failed or not (with the failure message incase it failed). */ -public class BulkItemResponse implements Streamable { +public class BulkItemResponse implements Streamable, StatusToXContent { + + @Override + public RestStatus status() { + return failure == null ? response.status() : failure.getStatus(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(opType); + if (failure == null) { + response.toXContent(builder, params); + builder.field(Fields.STATUS, response.status()); + } else { + builder.field(Fields._INDEX, failure.getIndex()); + builder.field(Fields._TYPE, failure.getType()); + builder.field(Fields._ID, failure.getId()); + builder.field(Fields.STATUS, failure.getStatus()); + builder.startObject(Fields.ERROR); + ElasticsearchException.toXContent(builder, params, failure.getCause()); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString STATUS = new XContentBuilderString("status"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + } /** * Represents a failure. @@ -99,7 +135,7 @@ public class BulkItemResponse implements Streamable { private String opType; - private ActionWriteResponse response; + private DocWriteResponse response; private Failure failure; @@ -107,7 +143,7 @@ public class BulkItemResponse implements Streamable { } - public BulkItemResponse(int id, String opType, ActionWriteResponse response) { + public BulkItemResponse(int id, String opType, DocWriteResponse response) { this.id = id; this.opType = opType; this.response = response; @@ -140,14 +176,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getIndex(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getIndex(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getIndex(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getIndex(); - } - return null; + return response.getIndex(); } /** @@ -157,14 +186,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getType(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getType(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getType(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getType(); - } - return null; + return response.getType(); } /** @@ -174,14 +196,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getId(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getId(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getId(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getId(); - } - return null; + return response.getId(); } /** @@ -191,21 +206,14 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return -1; } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getVersion(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getVersion(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getVersion(); - } - return -1; + return response.getVersion(); } /** * The actual response ({@link IndexResponse} or {@link DeleteResponse}). null in * case of failure. */ - public T getResponse() { + public T getResponse() { return (T) response; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index 6b08627f5de..76c80a9b064 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -29,7 +29,7 @@ import java.io.IOException; /** * */ -public class BulkShardResponse extends ActionWriteResponse { +public class BulkShardResponse extends ReplicationResponse { private ShardId shardId; private BulkItemResponse[] responses; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2cc81556222..2597695a1e2 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -204,7 +204,7 @@ public class TransportShardBulkAction extends TransportReplicationAction 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); @@ -216,7 +216,7 @@ public class TransportShardBulkAction extends TransportReplicationAction writeResult = updateResult.writeResult; DeleteResponse response = writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); - updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); // Replace the update request to the translated delete request to execute on the replica. item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index 26cfa57a13d..57781547266 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -19,9 +19,13 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -31,53 +35,19 @@ import java.io.IOException; * @see org.elasticsearch.action.delete.DeleteRequest * @see org.elasticsearch.client.Client#delete(DeleteRequest) */ -public class DeleteResponse extends ActionWriteResponse { +public class DeleteResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean found; public DeleteResponse() { } - public DeleteResponse(String index, String type, String id, long version, boolean found) { - this.index = index; - this.id = id; - this.type = type; - this.version = version; + public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) { + super(shardId, type, id, version); this.found = found; } - /** - * The index the document was deleted from. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document deleted. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document deleted. - */ - public String getId() { - return this.id; - } - - /** - * The version of the delete operation. - */ - public long getVersion() { - return this.version; - } /** * Returns true if a doc was found to delete. @@ -89,20 +59,44 @@ public class DeleteResponse extends ActionWriteResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); found = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(found); } + + @Override + public RestStatus status() { + if (found == false) { + return RestStatus.NOT_FOUND; + } + return super.status(); + } + + static final class Fields { + static final XContentBuilderString FOUND = new XContentBuilderString("found"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.FOUND, isFound()); + super.toXContent(builder, params); + return builder; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DeleteResponse["); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); + builder.append(",found=").append(found); + builder.append(",shards=").append(getShardInfo()); + return builder.append("]").toString(); + } } diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 1b3faec9999..ca66b285753 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -140,7 +140,7 @@ public class TransportDeleteAction extends TransportReplicationAction( - new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()), + new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found()), delete.getTranslogLocation()); } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 5727b2b673b..665327a749f 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -19,9 +19,13 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -31,54 +35,19 @@ import java.io.IOException; * @see org.elasticsearch.action.index.IndexRequest * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexResponse extends ActionWriteResponse { +public class IndexResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean created; public IndexResponse() { } - public IndexResponse(String index, String type, String id, long version, boolean created) { - this.index = index; - this.id = id; - this.type = type; - this.version = version; + public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) { + super(shardId, type, id, version); this.created = created; } - /** - * The index the document was indexed into. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document indexed. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document indexed. - */ - public String getId() { - return this.id; - } - - /** - * Returns the current version of the doc indexed. - */ - public long getVersion() { - return this.version; - } - /** * Returns true if the document was created, false if updated. */ @@ -86,23 +55,23 @@ public class IndexResponse extends ActionWriteResponse { return this.created; } + @Override + public RestStatus status() { + if (created) { + return RestStatus.CREATED; + } + return super.status(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); created = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(created); } @@ -110,12 +79,23 @@ public class IndexResponse extends ActionWriteResponse { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("IndexResponse["); - builder.append("index=").append(index); - builder.append(",type=").append(type); - builder.append(",id=").append(id); - builder.append(",version=").append(version); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); builder.append(",created=").append(created); builder.append(",shards=").append(getShardInfo()); return builder.append("]").toString(); } + + static final class Fields { + static final XContentBuilderString CREATED = new XContentBuilderString("created"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + builder.field(Fields.CREATED, isCreated()); + return builder; + } } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 37e76835270..620056ded4e 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -222,7 +222,7 @@ public class TransportIndexAction extends TransportReplicationAction(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); + return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index ddd4d42f7a6..33a9d349e80 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -22,9 +22,8 @@ package org.elasticsearch.action.support.replication; import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.HandledTransportAction; @@ -53,7 +52,7 @@ import java.util.function.Supplier; * Base class for requests that should be executed on all shards of an index or several indices. * This action sends shard requests to all primary shards of the indices and they are then replicated like write requests */ -public abstract class TransportBroadcastReplicationAction extends HandledTransportAction { +public abstract class TransportBroadcastReplicationAction extends HandledTransportAction { private final TransportReplicationAction replicatedBroadcastShardAction; private final ClusterService clusterService; @@ -91,15 +90,15 @@ public abstract class TransportBroadcastReplicationAction shardFailures = null; for (int i = 0; i < shardsResponses.size(); i++) { - ActionWriteResponse shardResponse = shardsResponses.get(i); + ReplicationResponse shardResponse = shardsResponses.get(i); if (shardResponse == null) { // non active shard, ignore } else { @@ -152,7 +151,7 @@ public abstract class TransportBroadcastReplicationAction(); } - for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { + for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause()))); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2f9fd6d483c..26c439c0a3d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilters; @@ -78,7 +78,7 @@ import java.util.function.Supplier; * primary node to validate request before primary operation followed by sampling state again for resolving * nodes with replica copies to perform replication. */ -public abstract class TransportReplicationAction extends TransportAction { +public abstract class TransportReplicationAction extends TransportAction { public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout"; @@ -214,7 +214,7 @@ public abstract class TransportReplicationAction { + protected static class WriteResult { public final T response; public final Translog.Location location; @@ -225,10 +225,10 @@ public abstract class TransportReplicationAction T response() { + public T response() { // this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica // request and not use it - response.setShardInfo(new ActionWriteResponse.ShardInfo()); + response.setShardInfo(new ReplicationResponse.ShardInfo()); return (T) response; } @@ -908,20 +908,20 @@ public abstract class TransportReplicationAction entry : shardReplicaFailures.entrySet()) { RestStatus restStatus = ExceptionsHelper.status(entry.getValue()); - failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure( + failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure( shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false ); } } else { - failuresArray = ActionWriteResponse.EMPTY; + failuresArray = ReplicationResponse.EMPTY; } - finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo( + finalResponse.setShardInfo(new ReplicationResponse.ShardInfo( totalShards, success.get(), failuresArray diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index b2d24fef714..e5edc1af96b 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -175,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(upsertRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); if (request.fields() != null && request.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); @@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); listener.onResponse(update); } @@ -240,7 +240,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio deleteAction.execute(deleteRequest, new ActionListener() { @Override public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); listener.onResponse(update); } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 4bdcd43023f..9f8b2a2e7be 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -83,9 +83,10 @@ public class UpdateHelper extends AbstractComponent { @SuppressWarnings("unchecked") protected Result prepare(UpdateRequest request, final GetResult getResult) { long getDateNS = System.nanoTime(); + final ShardId shardId = new ShardId(getResult.getIndex(), request.shardId()); if (!getResult.isExists()) { if (request.upsertRequest() == null && !request.docAsUpsert()) { - throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + throw new DocumentMissingException(shardId, request.type(), request.id()); } IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest(); TimeValue ttl = indexRequest.ttl(); @@ -113,7 +114,7 @@ public class UpdateHelper extends AbstractComponent { logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script.getScript()); } - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); update.setGetResult(getResult); return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON); @@ -145,7 +146,7 @@ public class UpdateHelper extends AbstractComponent { if (getResult.internalSourceRef() == null) { // no source, we can't do nothing, through a failure... - throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + throw new DocumentSourceMissingException(shardId, request.type(), request.id()); } Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true); @@ -231,12 +232,12 @@ public class UpdateHelper extends AbstractComponent { .consistencyLevel(request.consistencyLevel()); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef())); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } else { logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript()); - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index af6438097c8..2f3146b0644 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -19,21 +19,21 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; /** */ -public class UpdateResponse extends ActionWriteResponse { +public class UpdateResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean created; private GetResult getResult; @@ -44,47 +44,16 @@ public class UpdateResponse extends ActionWriteResponse { * Constructor to be used when a update didn't translate in a write. * For example: update script with operation set to none */ - public UpdateResponse(String index, String type, String id, long version, boolean created) { - this(new ShardInfo(0, 0), index, type, id, version, created); + public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) { + this(new ShardInfo(0, 0), shardId, type, id, version, created); } - public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) { + public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long version, boolean created) { + super(shardId, type, id, version); setShardInfo(shardInfo); - this.index = index; - this.id = id; - this.type = type; - this.version = version; this.created = created; } - /** - * The index the document was indexed into. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document indexed. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document indexed. - */ - public String getId() { - return this.id; - } - - /** - * Returns the current version of the doc indexed. - */ - public long getVersion() { - return this.version; - } - public void setGetResult(GetResult getResult) { this.getResult = getResult; } @@ -101,13 +70,17 @@ public class UpdateResponse extends ActionWriteResponse { } + @Override + public RestStatus status() { + if (created) { + return RestStatus.CREATED; + } + return super.status(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); created = in.readBoolean(); if (in.readBoolean()) { getResult = GetResult.readGetResult(in); @@ -117,10 +90,6 @@ public class UpdateResponse extends ActionWriteResponse { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(created); if (getResult == null) { out.writeBoolean(false); @@ -129,4 +98,34 @@ public class UpdateResponse extends ActionWriteResponse { getResult.writeTo(out); } } + + + static final class Fields { + static final XContentBuilderString GET = new XContentBuilderString("get"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + if (getGetResult() != null) { + builder.startObject(Fields.GET); + getGetResult().toXContentEmbedded(builder, params); + builder.endObject(); + } + return builder; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UpdateResponse["); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); + builder.append(",created=").append(created); + builder.append(",shards=").append(getShardInfo()); + return builder.append("]").toString(); + } + } diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 90184352714..536b73ba2b5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -19,16 +19,11 @@ package org.elasticsearch.rest.action.bulk; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; @@ -96,52 +91,7 @@ public class RestBulkAction extends BaseRestHandler { builder.startArray(Fields.ITEMS); for (BulkItemResponse itemResponse : response) { builder.startObject(); - builder.startObject(itemResponse.getOpType()); - builder.field(Fields._INDEX, itemResponse.getIndex()); - builder.field(Fields._TYPE, itemResponse.getType()); - builder.field(Fields._ID, itemResponse.getId()); - long version = itemResponse.getVersion(); - if (version != -1) { - builder.field(Fields._VERSION, itemResponse.getVersion()); - } - if (itemResponse.isFailed()) { - builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); - builder.startObject(Fields.ERROR); - ElasticsearchException.toXContent(builder, request, itemResponse.getFailure().getCause()); - builder.endObject(); - } else { - ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo(); - shardInfo.toXContent(builder, request); - if (itemResponse.getResponse() instanceof DeleteResponse) { - DeleteResponse deleteResponse = itemResponse.getResponse(); - if (deleteResponse.isFound()) { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } else { - builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); - } - builder.field(Fields.FOUND, deleteResponse.isFound()); - } else if (itemResponse.getResponse() instanceof IndexResponse) { - IndexResponse indexResponse = itemResponse.getResponse(); - if (indexResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } - } else if (itemResponse.getResponse() instanceof UpdateResponse) { - UpdateResponse updateResponse = itemResponse.getResponse(); - if (updateResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } - if (updateResponse.getGetResult() != null) { - builder.startObject(Fields.GET); - updateResponse.getGetResult().toXContentEmbedded(builder, request); - builder.endObject(); - } - } - } - builder.endObject(); + itemResponse.toXContent(builder, request); builder.endObject(); } builder.endArray(); @@ -155,15 +105,7 @@ public class RestBulkAction extends BaseRestHandler { static final class Fields { static final XContentBuilderString ITEMS = new XContentBuilderString("items"); static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString STATUS = new XContentBuilderString("status"); - static final XContentBuilderString ERROR = new XContentBuilderString("error"); static final XContentBuilderString TOOK = new XContentBuilderString("took"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString FOUND = new XContentBuilderString("found"); - static final XContentBuilderString GET = new XContentBuilderString("get"); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 209ab686ce5..e583ed36274 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.delete; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; @@ -27,14 +26,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import static org.elasticsearch.rest.RestRequest.Method.DELETE; -import static org.elasticsearch.rest.RestStatus.NOT_FOUND; /** * @@ -62,31 +60,6 @@ public class RestDeleteAction extends BaseRestHandler { deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); } - client.delete(deleteRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception { - ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo(); - builder.startObject().field(Fields.FOUND, result.isFound()) - .field(Fields._INDEX, result.getIndex()) - .field(Fields._TYPE, result.getType()) - .field(Fields._ID, result.getId()) - .field(Fields._VERSION, result.getVersion()) - .value(shardInfo) - .endObject(); - RestStatus status = shardInfo.status(); - if (!result.isFound()) { - status = NOT_FOUND; - } - return new BytesRestResponse(status, builder); - } - }); - } - - static final class Fields { - static final XContentBuilderString FOUND = new XContentBuilderString("found"); - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index c7fc29155cc..310ce0a1248 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.index; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -27,11 +26,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import java.io.IOException; @@ -99,33 +98,6 @@ public class RestIndexAction extends BaseRestHandler { if (consistencyLevel != null) { indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); } - client.index(indexRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); - builder.field(Fields._INDEX, response.getIndex()) - .field(Fields._TYPE, response.getType()) - .field(Fields._ID, response.getId()) - .field(Fields._VERSION, response.getVersion()); - shardInfo.toXContent(builder, request); - builder.field(Fields.CREATED, response.isCreated()); - builder.endObject(); - RestStatus status = shardInfo.status(); - if (response.isCreated()) { - status = CREATED; - } - return new BytesRestResponse(status, builder); - } - }); + client.index(indexRequest, new RestStatusToXContentListener<>(channel)); } - - static final class Fields { - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString CREATED = new XContentBuilderString("created"); - } - } diff --git a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index 76e96ab7e7f..f59c329fbc3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.update; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; @@ -40,6 +38,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptParameterParser; import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue; @@ -48,7 +47,6 @@ import java.util.HashMap; import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.RestStatus.CREATED; /** */ @@ -123,38 +121,6 @@ public class RestUpdateAction extends BaseRestHandler { } } - client.update(updateRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); - builder.field(Fields._INDEX, response.getIndex()) - .field(Fields._TYPE, response.getType()) - .field(Fields._ID, response.getId()) - .field(Fields._VERSION, response.getVersion()); - - shardInfo.toXContent(builder, request); - if (response.getGetResult() != null) { - builder.startObject(Fields.GET); - response.getGetResult().toXContentEmbedded(builder, request); - builder.endObject(); - } - - builder.endObject(); - RestStatus status = shardInfo.status(); - if (response.isCreated()) { - status = CREATED; - } - return new BytesRestResponse(status, builder); - } - }); - } - - static final class Fields { - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString GET = new XContentBuilderString("get"); + client.update(updateRequest, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index d31a024187c..4d17155f611 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -101,7 +101,7 @@ public class BroadcastReplicationTests extends ESTestCase { randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); } else { @@ -120,10 +120,10 @@ public class BroadcastReplicationTests extends ESTestCase { ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { - ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); - actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0])); - shardRequests.v2().onResponse(actionWriteResponse); + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + ReplicationResponse replicationResponse = new ReplicationResponse(); + replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1, new ReplicationResponse.ShardInfo.Failure[0])); + shardRequests.v2().onResponse(replicationResponse); } logger.info("total shards: {}, ", response.get().getTotalShards()); assertBroadcastResponse(1, 1, 0, response.get(), null); @@ -137,20 +137,20 @@ public class BroadcastReplicationTests extends ESTestCase { Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); int succeeded = 0; int failed = 0; - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { - ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0]; + ReplicationResponse.ShardInfo.Failure[] failures = new ReplicationResponse.ShardInfo.Failure[0]; int shardsSucceeded = randomInt(1) + 1; succeeded += shardsSucceeded; - ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); + ReplicationResponse replicationResponse = new ReplicationResponse(); if (shardsSucceeded == 1 && randomBoolean()) { //sometimes add failure (no failure means shard unavailable) - failures = new ActionWriteResponse.ShardInfo.Failure[1]; - failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false); + failures = new ReplicationResponse.ShardInfo.Failure[1]; + failures[0] = new ReplicationResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false); failed++; } - actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures)); - shardRequests.v2().onResponse(actionWriteResponse); + replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures)); + shardRequests.v2().onResponse(replicationResponse); } else { // sometimes fail failed += 2; @@ -179,16 +179,16 @@ public class BroadcastReplicationTests extends ESTestCase { assertThat(shards.get(0), equalTo(shardId)); } - private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction { - protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); + private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction { + protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { super("test-broadcast-replication-action", BroadcastRequest::new, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction); } @Override - protected ActionWriteResponse newShardResponse() { - return new ActionWriteResponse(); + protected ReplicationResponse newShardResponse() { + return new ReplicationResponse(); } @Override @@ -202,7 +202,7 @@ public class BroadcastReplicationTests extends ESTestCase { } @Override - protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener shardActionListener) { + protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener shardActionListener) { capturedShardRequests.add(new Tuple<>(shardId, shardActionListener)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index f1e270b40fc..5834b2662ad 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilter; @@ -521,7 +521,7 @@ public class TransportReplicationActionTests extends ESTestCase { } assertThat(listener.isDone(), equalTo(true)); Response response = listener.get(); - final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); + final ReplicationResponse.ShardInfo shardInfo = response.getShardInfo(); assertThat(shardInfo.getFailed(), equalTo(criticalFailures)); assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures)); assertThat(shardInfo.getSuccessful(), equalTo(successful)); @@ -703,7 +703,7 @@ public class TransportReplicationActionTests extends ESTestCase { } } - static class Response extends ActionWriteResponse { + static class Response extends ReplicationResponse { } class Action extends TransportReplicationAction { diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index d4907d82128..4f28cf19d7b 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.document; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -117,11 +117,11 @@ public class ShardInfoIT extends ESIntegTestCase { } } - private void assertShardInfo(ActionWriteResponse response) { + private void assertShardInfo(ReplicationResponse response) { assertShardInfo(response, numCopies, numNodes); } - private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful) { + private void assertShardInfo(ReplicationResponse response, int expectedTotal, int expectedSuccessful) { assertThat(response.getShardInfo().getTotal(), greaterThanOrEqualTo(expectedTotal)); assertThat(response.getShardInfo().getSuccessful(), greaterThanOrEqualTo(expectedSuccessful)); } diff --git a/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java b/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java index 2b708341c7f..c44608c4e4b 100644 --- a/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java +++ b/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHit; @@ -225,7 +226,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase { } else { deleted++; } - items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test", "type", String.valueOf(i), 1, delete)); + items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test", 0), "type", String.valueOf(i), 1, delete)); } else { items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed"))); failed++; @@ -281,7 +282,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase { deleted[0] = deleted[0] + 1; deleted[index] = deleted[index] + 1; } - items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test-" + index, "type", String.valueOf(i), 1, delete)); + items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test-" + index, 0), "type", String.valueOf(i), 1, delete)); } else { items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed"))); failed[0] = failed[0] + 1;