diff --git a/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java new file mode 100644 index 00000000000..009d3fc47a9 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/action/DocWriteResponse.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; + +/** + * A base class for the response of a write operation that involves a single doc + */ +public abstract class DocWriteResponse extends ReplicationResponse implements StatusToXContent { + + private ShardId shardId; + private String id; + private String type; + private long version; + + public DocWriteResponse(ShardId shardId, String type, String id, long version) { + this.shardId = shardId; + this.type = type; + this.id = id; + this.version = version; + } + + // needed for deserialization + protected DocWriteResponse() { + } + + /** + * The index the document was changed in. + */ + public String getIndex() { + return this.shardId.getIndex(); + } + + + /** + * The exact shard the document was changed in. + */ + public ShardId getShardId() { + return this.shardId; + } + + /** + * The type of the document changed. + */ + public String getType() { + return this.type; + } + + /** + * The id of the document changed. + */ + public String getId() { + return this.id; + } + + /** + * Returns the current version of the doc. + */ + public long getVersion() { + return this.version; + } + + /** returns the rest status for this response (based on {@link ShardInfo#status()} */ + public RestStatus status() { + return getShardInfo().status(); + } + + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + shardId = ShardId.readShardId(in); + type = in.readString(); + id = in.readString(); + version = in.readZLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + shardId.writeTo(out); + out.writeString(type); + out.writeString(id); + out.writeZLong(version); + } + + static final class Fields { + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + ReplicationResponse.ShardInfo shardInfo = getShardInfo(); + builder.field(Fields._INDEX, shardId.getIndex()) + .field(Fields._TYPE, type) + .field(Fields._ID, id) + .field(Fields._VERSION, version); + shardInfo.toXContent(builder, params); + return builder; + } +} diff --git a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java b/core/src/main/java/org/elasticsearch/action/ReplicationResponse.java similarity index 96% rename from core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java rename to core/src/main/java/org/elasticsearch/action/ReplicationResponse.java index f4152ac85e4..4e358c8d42a 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionWriteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/ReplicationResponse.java @@ -21,7 +21,6 @@ package org.elasticsearch.action; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.bootstrap.Elasticsearch; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -30,25 +29,23 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; -import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.rest.RestStatus; import java.io.IOException; -import java.util.Collections; /** * Base class for write action responses. */ -public class ActionWriteResponse extends ActionResponse { +public class ReplicationResponse extends ActionResponse { - public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0]; + public final static ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0]; private ShardInfo shardInfo; @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in); + shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index 00e03ffdf6e..d2a8f1abcbf 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction; @@ -36,7 +36,7 @@ import java.util.List; /** * Flush Action. */ -public class TransportFlushAction extends TransportBroadcastReplicationAction { +public class TransportFlushAction extends TransportBroadcastReplicationAction { @Inject public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -47,8 +47,8 @@ public class TransportFlushAction extends TransportBroadcastReplicationAction { +public class TransportShardFlushAction extends TransportReplicationAction { public static final String NAME = FlushAction.NAME + "[s]"; @@ -53,16 +53,16 @@ public class TransportShardFlushAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", indexShard.shardId()); - return new Tuple<>(new ActionWriteResponse(), shardRequest); + return new Tuple<>(new ReplicationResponse(), shardRequest); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index a5e30abc12d..a76b714b31d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationRequest; @@ -37,7 +37,7 @@ import java.util.List; /** * Refresh action. */ -public class TransportRefreshAction extends TransportBroadcastReplicationAction { +public class TransportRefreshAction extends TransportBroadcastReplicationAction { @Inject public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, @@ -48,8 +48,8 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction< } @Override - protected ActionWriteResponse newShardResponse() { - return new ActionWriteResponse(); + protected ReplicationResponse newShardResponse() { + return new ReplicationResponse(); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 02af4ad89a0..c78977fb362 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -41,7 +41,7 @@ import org.elasticsearch.transport.TransportService; /** * */ -public class TransportShardRefreshAction extends TransportReplicationAction { +public class TransportShardRefreshAction extends TransportReplicationAction { public static final String NAME = RefreshAction.NAME + "[s]"; @@ -55,16 +55,16 @@ public class TransportShardRefreshAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.refresh("api"); logger.trace("{} refresh request executed on primary", indexShard.shardId()); - return new Tuple<>(new ActionWriteResponse(), shardRequest); + return new Tuple<>(new ReplicationResponse(), shardRequest); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index 80e86eaaf17..982700016b7 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -19,14 +19,18 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -35,7 +39,39 @@ import java.io.IOException; * Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id * of the relevant action, and if it has failed or not (with the failure message incase it failed). */ -public class BulkItemResponse implements Streamable { +public class BulkItemResponse implements Streamable, StatusToXContent { + + @Override + public RestStatus status() { + return failure == null ? response.status() : failure.getStatus(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(opType); + if (failure == null) { + response.toXContent(builder, params); + builder.field(Fields.STATUS, response.status()); + } else { + builder.field(Fields._INDEX, failure.getIndex()); + builder.field(Fields._TYPE, failure.getType()); + builder.field(Fields._ID, failure.getId()); + builder.field(Fields.STATUS, failure.getStatus()); + builder.startObject(Fields.ERROR); + ElasticsearchException.toXContent(builder, params, failure.getCause()); + builder.endObject(); + } + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); + static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); + static final XContentBuilderString _ID = new XContentBuilderString("_id"); + static final XContentBuilderString STATUS = new XContentBuilderString("status"); + static final XContentBuilderString ERROR = new XContentBuilderString("error"); + } /** * Represents a failure. @@ -99,7 +135,7 @@ public class BulkItemResponse implements Streamable { private String opType; - private ActionWriteResponse response; + private DocWriteResponse response; private Failure failure; @@ -107,7 +143,7 @@ public class BulkItemResponse implements Streamable { } - public BulkItemResponse(int id, String opType, ActionWriteResponse response) { + public BulkItemResponse(int id, String opType, DocWriteResponse response) { this.id = id; this.opType = opType; this.response = response; @@ -140,14 +176,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getIndex(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getIndex(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getIndex(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getIndex(); - } - return null; + return response.getIndex(); } /** @@ -157,14 +186,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getType(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getType(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getType(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getType(); - } - return null; + return response.getType(); } /** @@ -174,14 +196,7 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return failure.getId(); } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getId(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getId(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getId(); - } - return null; + return response.getId(); } /** @@ -191,21 +206,14 @@ public class BulkItemResponse implements Streamable { if (failure != null) { return -1; } - if (response instanceof IndexResponse) { - return ((IndexResponse) response).getVersion(); - } else if (response instanceof DeleteResponse) { - return ((DeleteResponse) response).getVersion(); - } else if (response instanceof UpdateResponse) { - return ((UpdateResponse) response).getVersion(); - } - return -1; + return response.getVersion(); } /** * The actual response ({@link IndexResponse} or {@link DeleteResponse}). null in * case of failure. */ - public T getResponse() { + public T getResponse() { return (T) response; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java index 6b08627f5de..76c80a9b064 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkShardResponse.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.bulk; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.shard.ShardId; @@ -29,7 +29,7 @@ import java.io.IOException; /** * */ -public class BulkShardResponse extends ActionWriteResponse { +public class BulkShardResponse extends ReplicationResponse { private ShardId shardId; private BulkItemResponse[] responses; diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 2cc81556222..2597695a1e2 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -204,7 +204,7 @@ public class TransportShardBulkAction extends TransportReplicationAction 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); @@ -216,7 +216,7 @@ public class TransportShardBulkAction extends TransportReplicationAction writeResult = updateResult.writeResult; DeleteResponse response = writeResult.response(); DeleteRequest deleteRequest = updateResult.request(); - updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false); updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); // Replace the update request to the translated delete request to execute on the replica. item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest); diff --git a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java index 26cfa57a13d..57781547266 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java +++ b/core/src/main/java/org/elasticsearch/action/delete/DeleteResponse.java @@ -19,9 +19,13 @@ package org.elasticsearch.action.delete; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -31,53 +35,19 @@ import java.io.IOException; * @see org.elasticsearch.action.delete.DeleteRequest * @see org.elasticsearch.client.Client#delete(DeleteRequest) */ -public class DeleteResponse extends ActionWriteResponse { +public class DeleteResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean found; public DeleteResponse() { } - public DeleteResponse(String index, String type, String id, long version, boolean found) { - this.index = index; - this.id = id; - this.type = type; - this.version = version; + public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) { + super(shardId, type, id, version); this.found = found; } - /** - * The index the document was deleted from. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document deleted. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document deleted. - */ - public String getId() { - return this.id; - } - - /** - * The version of the delete operation. - */ - public long getVersion() { - return this.version; - } /** * Returns true if a doc was found to delete. @@ -89,20 +59,44 @@ public class DeleteResponse extends ActionWriteResponse { @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); found = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(found); } + + @Override + public RestStatus status() { + if (found == false) { + return RestStatus.NOT_FOUND; + } + return super.status(); + } + + static final class Fields { + static final XContentBuilderString FOUND = new XContentBuilderString("found"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.FOUND, isFound()); + super.toXContent(builder, params); + return builder; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("DeleteResponse["); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); + builder.append(",found=").append(found); + builder.append(",shards=").append(getShardInfo()); + return builder.append("]").toString(); + } } diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 1b3faec9999..ca66b285753 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -140,7 +140,7 @@ public class TransportDeleteAction extends TransportReplicationAction( - new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()), + new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found()), delete.getTranslogLocation()); } diff --git a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java index 5727b2b673b..665327a749f 100644 --- a/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java +++ b/core/src/main/java/org/elasticsearch/action/index/IndexResponse.java @@ -19,9 +19,13 @@ package org.elasticsearch.action.index; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -31,54 +35,19 @@ import java.io.IOException; * @see org.elasticsearch.action.index.IndexRequest * @see org.elasticsearch.client.Client#index(IndexRequest) */ -public class IndexResponse extends ActionWriteResponse { +public class IndexResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean created; public IndexResponse() { } - public IndexResponse(String index, String type, String id, long version, boolean created) { - this.index = index; - this.id = id; - this.type = type; - this.version = version; + public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) { + super(shardId, type, id, version); this.created = created; } - /** - * The index the document was indexed into. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document indexed. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document indexed. - */ - public String getId() { - return this.id; - } - - /** - * Returns the current version of the doc indexed. - */ - public long getVersion() { - return this.version; - } - /** * Returns true if the document was created, false if updated. */ @@ -86,23 +55,23 @@ public class IndexResponse extends ActionWriteResponse { return this.created; } + @Override + public RestStatus status() { + if (created) { + return RestStatus.CREATED; + } + return super.status(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); created = in.readBoolean(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(created); } @@ -110,12 +79,23 @@ public class IndexResponse extends ActionWriteResponse { public String toString() { StringBuilder builder = new StringBuilder(); builder.append("IndexResponse["); - builder.append("index=").append(index); - builder.append(",type=").append(type); - builder.append(",id=").append(id); - builder.append(",version=").append(version); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); builder.append(",created=").append(created); builder.append(",shards=").append(getShardInfo()); return builder.append("]").toString(); } + + static final class Fields { + static final XContentBuilderString CREATED = new XContentBuilderString("created"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + builder.field(Fields.CREATED, isCreated()); + return builder; + } } diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 37e76835270..620056ded4e 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -222,7 +222,7 @@ public class TransportIndexAction extends TransportReplicationAction(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); + return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.version(), created), operation.getTranslogLocation()); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index ddd4d42f7a6..33a9d349e80 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -22,9 +22,8 @@ package org.elasticsearch.action.support.replication; import com.carrotsearch.hppc.cursors.IntObjectCursor; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.ShardOperationFailedException; -import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.HandledTransportAction; @@ -53,7 +52,7 @@ import java.util.function.Supplier; * Base class for requests that should be executed on all shards of an index or several indices. * This action sends shard requests to all primary shards of the indices and they are then replicated like write requests */ -public abstract class TransportBroadcastReplicationAction extends HandledTransportAction { +public abstract class TransportBroadcastReplicationAction extends HandledTransportAction { private final TransportReplicationAction replicatedBroadcastShardAction; private final ClusterService clusterService; @@ -91,15 +90,15 @@ public abstract class TransportBroadcastReplicationAction shardFailures = null; for (int i = 0; i < shardsResponses.size(); i++) { - ActionWriteResponse shardResponse = shardsResponses.get(i); + ReplicationResponse shardResponse = shardsResponses.get(i); if (shardResponse == null) { // non active shard, ignore } else { @@ -152,7 +151,7 @@ public abstract class TransportBroadcastReplicationAction(); } - for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { + for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) { shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause()))); } } diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 2f9fd6d483c..26c439c0a3d 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilters; @@ -78,7 +78,7 @@ import java.util.function.Supplier; * primary node to validate request before primary operation followed by sampling state again for resolving * nodes with replica copies to perform replication. */ -public abstract class TransportReplicationAction extends TransportAction { +public abstract class TransportReplicationAction extends TransportAction { public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout"; @@ -214,7 +214,7 @@ public abstract class TransportReplicationAction { + protected static class WriteResult { public final T response; public final Translog.Location location; @@ -225,10 +225,10 @@ public abstract class TransportReplicationAction T response() { + public T response() { // this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica // request and not use it - response.setShardInfo(new ActionWriteResponse.ShardInfo()); + response.setShardInfo(new ReplicationResponse.ShardInfo()); return (T) response; } @@ -908,20 +908,20 @@ public abstract class TransportReplicationAction entry : shardReplicaFailures.entrySet()) { RestStatus restStatus = ExceptionsHelper.status(entry.getValue()); - failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure( + failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure( shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false ); } } else { - failuresArray = ActionWriteResponse.EMPTY; + failuresArray = ReplicationResponse.EMPTY; } - finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo( + finalResponse.setShardInfo(new ReplicationResponse.ShardInfo( totalShards, success.get(), failuresArray diff --git a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index b2d24fef714..e5edc1af96b 100644 --- a/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -175,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(upsertRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); if (request.fields() != null && request.fields().length > 0) { Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); @@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio indexAction.execute(indexRequest, new ActionListener() { @Override public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated()); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); listener.onResponse(update); } @@ -240,7 +240,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio deleteAction.execute(deleteRequest, new ActionListener() { @Override public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false); + UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false); update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); listener.onResponse(update); } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java index 4bdcd43023f..9f8b2a2e7be 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -83,9 +83,10 @@ public class UpdateHelper extends AbstractComponent { @SuppressWarnings("unchecked") protected Result prepare(UpdateRequest request, final GetResult getResult) { long getDateNS = System.nanoTime(); + final ShardId shardId = new ShardId(getResult.getIndex(), request.shardId()); if (!getResult.isExists()) { if (request.upsertRequest() == null && !request.docAsUpsert()) { - throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + throw new DocumentMissingException(shardId, request.type(), request.id()); } IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest(); TimeValue ttl = indexRequest.ttl(); @@ -113,7 +114,7 @@ public class UpdateHelper extends AbstractComponent { logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice, request.script.getScript()); } - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); update.setGetResult(getResult); return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON); @@ -145,7 +146,7 @@ public class UpdateHelper extends AbstractComponent { if (getResult.internalSourceRef() == null) { // no source, we can't do nothing, through a failure... - throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + throw new DocumentSourceMissingException(shardId, request.type(), request.id()); } Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true); @@ -231,12 +232,12 @@ public class UpdateHelper extends AbstractComponent { .consistencyLevel(request.consistencyLevel()); return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); } else if ("none".equals(operation)) { - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef())); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } else { logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript()); - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false); + UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false); return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); } } diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index af6438097c8..2f3146b0644 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -19,21 +19,21 @@ package org.elasticsearch.action.update; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; /** */ -public class UpdateResponse extends ActionWriteResponse { +public class UpdateResponse extends DocWriteResponse { - private String index; - private String id; - private String type; - private long version; private boolean created; private GetResult getResult; @@ -44,47 +44,16 @@ public class UpdateResponse extends ActionWriteResponse { * Constructor to be used when a update didn't translate in a write. * For example: update script with operation set to none */ - public UpdateResponse(String index, String type, String id, long version, boolean created) { - this(new ShardInfo(0, 0), index, type, id, version, created); + public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) { + this(new ShardInfo(0, 0), shardId, type, id, version, created); } - public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) { + public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long version, boolean created) { + super(shardId, type, id, version); setShardInfo(shardInfo); - this.index = index; - this.id = id; - this.type = type; - this.version = version; this.created = created; } - /** - * The index the document was indexed into. - */ - public String getIndex() { - return this.index; - } - - /** - * The type of the document indexed. - */ - public String getType() { - return this.type; - } - - /** - * The id of the document indexed. - */ - public String getId() { - return this.id; - } - - /** - * Returns the current version of the doc indexed. - */ - public long getVersion() { - return this.version; - } - public void setGetResult(GetResult getResult) { this.getResult = getResult; } @@ -101,13 +70,17 @@ public class UpdateResponse extends ActionWriteResponse { } + @Override + public RestStatus status() { + if (created) { + return RestStatus.CREATED; + } + return super.status(); + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - index = in.readString(); - type = in.readString(); - id = in.readString(); - version = in.readLong(); created = in.readBoolean(); if (in.readBoolean()) { getResult = GetResult.readGetResult(in); @@ -117,10 +90,6 @@ public class UpdateResponse extends ActionWriteResponse { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(index); - out.writeString(type); - out.writeString(id); - out.writeLong(version); out.writeBoolean(created); if (getResult == null) { out.writeBoolean(false); @@ -129,4 +98,34 @@ public class UpdateResponse extends ActionWriteResponse { getResult.writeTo(out); } } + + + static final class Fields { + static final XContentBuilderString GET = new XContentBuilderString("get"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + super.toXContent(builder, params); + if (getGetResult() != null) { + builder.startObject(Fields.GET); + getGetResult().toXContentEmbedded(builder, params); + builder.endObject(); + } + return builder; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("UpdateResponse["); + builder.append("index=").append(getIndex()); + builder.append(",type=").append(getType()); + builder.append(",id=").append(getId()); + builder.append(",version=").append(getVersion()); + builder.append(",created=").append(created); + builder.append(",shards=").append(getShardInfo()); + return builder.append("]").toString(); + } + } diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 90184352714..536b73ba2b5 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -19,16 +19,11 @@ package org.elasticsearch.rest.action.bulk; -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkShardRequest; -import org.elasticsearch.action.delete.DeleteResponse; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; import org.elasticsearch.common.Strings; @@ -96,52 +91,7 @@ public class RestBulkAction extends BaseRestHandler { builder.startArray(Fields.ITEMS); for (BulkItemResponse itemResponse : response) { builder.startObject(); - builder.startObject(itemResponse.getOpType()); - builder.field(Fields._INDEX, itemResponse.getIndex()); - builder.field(Fields._TYPE, itemResponse.getType()); - builder.field(Fields._ID, itemResponse.getId()); - long version = itemResponse.getVersion(); - if (version != -1) { - builder.field(Fields._VERSION, itemResponse.getVersion()); - } - if (itemResponse.isFailed()) { - builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus()); - builder.startObject(Fields.ERROR); - ElasticsearchException.toXContent(builder, request, itemResponse.getFailure().getCause()); - builder.endObject(); - } else { - ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo(); - shardInfo.toXContent(builder, request); - if (itemResponse.getResponse() instanceof DeleteResponse) { - DeleteResponse deleteResponse = itemResponse.getResponse(); - if (deleteResponse.isFound()) { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } else { - builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus()); - } - builder.field(Fields.FOUND, deleteResponse.isFound()); - } else if (itemResponse.getResponse() instanceof IndexResponse) { - IndexResponse indexResponse = itemResponse.getResponse(); - if (indexResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } - } else if (itemResponse.getResponse() instanceof UpdateResponse) { - UpdateResponse updateResponse = itemResponse.getResponse(); - if (updateResponse.isCreated()) { - builder.field(Fields.STATUS, RestStatus.CREATED.getStatus()); - } else { - builder.field(Fields.STATUS, shardInfo.status().getStatus()); - } - if (updateResponse.getGetResult() != null) { - builder.startObject(Fields.GET); - updateResponse.getGetResult().toXContentEmbedded(builder, request); - builder.endObject(); - } - } - } - builder.endObject(); + itemResponse.toXContent(builder, request); builder.endObject(); } builder.endArray(); @@ -155,15 +105,7 @@ public class RestBulkAction extends BaseRestHandler { static final class Fields { static final XContentBuilderString ITEMS = new XContentBuilderString("items"); static final XContentBuilderString ERRORS = new XContentBuilderString("errors"); - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString STATUS = new XContentBuilderString("status"); - static final XContentBuilderString ERROR = new XContentBuilderString("error"); static final XContentBuilderString TOOK = new XContentBuilderString("took"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString FOUND = new XContentBuilderString("found"); - static final XContentBuilderString GET = new XContentBuilderString("get"); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 209ab686ce5..e583ed36274 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.delete; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; @@ -27,14 +26,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import static org.elasticsearch.rest.RestRequest.Method.DELETE; -import static org.elasticsearch.rest.RestStatus.NOT_FOUND; /** * @@ -62,31 +60,6 @@ public class RestDeleteAction extends BaseRestHandler { deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); } - client.delete(deleteRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception { - ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo(); - builder.startObject().field(Fields.FOUND, result.isFound()) - .field(Fields._INDEX, result.getIndex()) - .field(Fields._TYPE, result.getType()) - .field(Fields._ID, result.getId()) - .field(Fields._VERSION, result.getVersion()) - .value(shardInfo) - .endObject(); - RestStatus status = shardInfo.status(); - if (!result.isFound()) { - status = NOT_FOUND; - } - return new BytesRestResponse(status, builder); - } - }); - } - - static final class Fields { - static final XContentBuilderString FOUND = new XContentBuilderString("found"); - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); + client.delete(deleteRequest, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index c7fc29155cc..310ce0a1248 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.index; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; @@ -27,11 +26,11 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import java.io.IOException; @@ -99,33 +98,6 @@ public class RestIndexAction extends BaseRestHandler { if (consistencyLevel != null) { indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel)); } - client.index(indexRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); - builder.field(Fields._INDEX, response.getIndex()) - .field(Fields._TYPE, response.getType()) - .field(Fields._ID, response.getId()) - .field(Fields._VERSION, response.getVersion()); - shardInfo.toXContent(builder, request); - builder.field(Fields.CREATED, response.isCreated()); - builder.endObject(); - RestStatus status = shardInfo.status(); - if (response.isCreated()) { - status = CREATED; - } - return new BytesRestResponse(status, builder); - } - }); + client.index(indexRequest, new RestStatusToXContentListener<>(channel)); } - - static final class Fields { - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString CREATED = new XContentBuilderString("created"); - } - } diff --git a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index 76e96ab7e7f..f59c329fbc3 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.rest.action.update; -import org.elasticsearch.action.ActionWriteResponse; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; @@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.BytesRestResponse; @@ -40,6 +38,7 @@ import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestBuilderListener; +import org.elasticsearch.rest.action.support.RestStatusToXContentListener; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptParameterParser; import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue; @@ -48,7 +47,6 @@ import java.util.HashMap; import java.util.Map; import static org.elasticsearch.rest.RestRequest.Method.POST; -import static org.elasticsearch.rest.RestStatus.CREATED; /** */ @@ -123,38 +121,6 @@ public class RestUpdateAction extends BaseRestHandler { } } - client.update(updateRequest, new RestBuilderListener(channel) { - @Override - public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); - builder.field(Fields._INDEX, response.getIndex()) - .field(Fields._TYPE, response.getType()) - .field(Fields._ID, response.getId()) - .field(Fields._VERSION, response.getVersion()); - - shardInfo.toXContent(builder, request); - if (response.getGetResult() != null) { - builder.startObject(Fields.GET); - response.getGetResult().toXContentEmbedded(builder, request); - builder.endObject(); - } - - builder.endObject(); - RestStatus status = shardInfo.status(); - if (response.isCreated()) { - status = CREATED; - } - return new BytesRestResponse(status, builder); - } - }); - } - - static final class Fields { - static final XContentBuilderString _INDEX = new XContentBuilderString("_index"); - static final XContentBuilderString _TYPE = new XContentBuilderString("_type"); - static final XContentBuilderString _ID = new XContentBuilderString("_id"); - static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); - static final XContentBuilderString GET = new XContentBuilderString("get"); + client.update(updateRequest, new RestStatusToXContentListener<>(channel)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index d31a024187c..4d17155f611 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.admin.indices.flush.FlushRequest; @@ -101,7 +101,7 @@ public class BroadcastReplicationTests extends ESTestCase { randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1())); } else { @@ -120,10 +120,10 @@ public class BroadcastReplicationTests extends ESTestCase { ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { - ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); - actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0])); - shardRequests.v2().onResponse(actionWriteResponse); + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + ReplicationResponse replicationResponse = new ReplicationResponse(); + replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1, new ReplicationResponse.ShardInfo.Failure[0])); + shardRequests.v2().onResponse(replicationResponse); } logger.info("total shards: {}, ", response.get().getTotalShards()); assertBroadcastResponse(1, 1, 0, response.get(), null); @@ -137,20 +137,20 @@ public class BroadcastReplicationTests extends ESTestCase { Future response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index))); int succeeded = 0; int failed = 0; - for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { + for (Tuple> shardRequests : broadcastReplicationAction.capturedShardRequests) { if (randomBoolean()) { - ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0]; + ReplicationResponse.ShardInfo.Failure[] failures = new ReplicationResponse.ShardInfo.Failure[0]; int shardsSucceeded = randomInt(1) + 1; succeeded += shardsSucceeded; - ActionWriteResponse actionWriteResponse = new ActionWriteResponse(); + ReplicationResponse replicationResponse = new ReplicationResponse(); if (shardsSucceeded == 1 && randomBoolean()) { //sometimes add failure (no failure means shard unavailable) - failures = new ActionWriteResponse.ShardInfo.Failure[1]; - failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false); + failures = new ReplicationResponse.ShardInfo.Failure[1]; + failures[0] = new ReplicationResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false); failed++; } - actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures)); - shardRequests.v2().onResponse(actionWriteResponse); + replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures)); + shardRequests.v2().onResponse(replicationResponse); } else { // sometimes fail failed += 2; @@ -179,16 +179,16 @@ public class BroadcastReplicationTests extends ESTestCase { assertThat(shards.get(0), equalTo(shardId)); } - private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction { - protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); + private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction { + protected final Set>> capturedShardRequests = ConcurrentCollections.newConcurrentSet(); public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) { super("test-broadcast-replication-action", BroadcastRequest::new, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction); } @Override - protected ActionWriteResponse newShardResponse() { - return new ActionWriteResponse(); + protected ReplicationResponse newShardResponse() { + return new ReplicationResponse(); } @Override @@ -202,7 +202,7 @@ public class BroadcastReplicationTests extends ESTestCase { } @Override - protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener shardActionListener) { + protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener shardActionListener) { capturedShardRequests.add(new Tuple<>(shardId, shardActionListener)); } } diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index f1e270b40fc..5834b2662ad 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.UnavailableShardsException; import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.support.ActionFilter; @@ -521,7 +521,7 @@ public class TransportReplicationActionTests extends ESTestCase { } assertThat(listener.isDone(), equalTo(true)); Response response = listener.get(); - final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo(); + final ReplicationResponse.ShardInfo shardInfo = response.getShardInfo(); assertThat(shardInfo.getFailed(), equalTo(criticalFailures)); assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures)); assertThat(shardInfo.getSuccessful(), equalTo(successful)); @@ -703,7 +703,7 @@ public class TransportReplicationActionTests extends ESTestCase { } } - static class Response extends ActionWriteResponse { + static class Response extends ReplicationResponse { } class Action extends TransportReplicationAction { diff --git a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java index d4907d82128..4f28cf19d7b 100644 --- a/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java +++ b/core/src/test/java/org/elasticsearch/document/ShardInfoIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.document; -import org.elasticsearch.action.ActionWriteResponse; +import org.elasticsearch.action.ReplicationResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -117,11 +117,11 @@ public class ShardInfoIT extends ESIntegTestCase { } } - private void assertShardInfo(ActionWriteResponse response) { + private void assertShardInfo(ReplicationResponse response) { assertShardInfo(response, numCopies, numNodes); } - private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful) { + private void assertShardInfo(ReplicationResponse response, int expectedTotal, int expectedSuccessful) { assertThat(response.getShardInfo().getTotal(), greaterThanOrEqualTo(expectedTotal)); assertThat(response.getShardInfo().getSuccessful(), greaterThanOrEqualTo(expectedSuccessful)); } diff --git a/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java b/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java index 2b708341c7f..c44608c4e4b 100644 --- a/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java +++ b/plugins/delete-by-query/src/test/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryActionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.text.StringText; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.concurrent.CountDown; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.InternalSearchHit; @@ -225,7 +226,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase { } else { deleted++; } - items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test", "type", String.valueOf(i), 1, delete)); + items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test", 0), "type", String.valueOf(i), 1, delete)); } else { items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed"))); failed++; @@ -281,7 +282,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase { deleted[0] = deleted[0] + 1; deleted[index] = deleted[index] + 1; } - items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test-" + index, "type", String.valueOf(i), 1, delete)); + items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test-" + index, 0), "type", String.valueOf(i), 1, delete)); } else { items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed"))); failed[0] = failed[0] + 1;