Introduce a common base response class to all single doc write ops

IndexResponse, DeleteResponse and UpdateResponse share some logic. This can be unified to a single DocWriteResponse base class. On top, some replication actions are now not about write operations anymore. This commit renames ActionWriteResponse to ReplicationResponse

Last some toXContent is moved from the Rest layer to the actual response classes, for more code re-sharing.

Closes #15334
This commit is contained in:
Boaz Leskes 2015-12-09 12:36:50 +01:00
parent fab44398d9
commit fafeb3abdd
26 changed files with 385 additions and 423 deletions

View File

@ -0,0 +1,130 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
* A base class for the response of a write operation that involves a single doc
*/
public abstract class DocWriteResponse extends ReplicationResponse implements StatusToXContent {
private ShardId shardId;
private String id;
private String type;
private long version;
public DocWriteResponse(ShardId shardId, String type, String id, long version) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.version = version;
}
// needed for deserialization
protected DocWriteResponse() {
}
/**
* The index the document was changed in.
*/
public String getIndex() {
return this.shardId.getIndex();
}
/**
* The exact shard the document was changed in.
*/
public ShardId getShardId() {
return this.shardId;
}
/**
* The type of the document changed.
*/
public String getType() {
return this.type;
}
/**
* The id of the document changed.
*/
public String getId() {
return this.id;
}
/**
* Returns the current version of the doc.
*/
public long getVersion() {
return this.version;
}
/** returns the rest status for this response (based on {@link ShardInfo#status()} */
public RestStatus status() {
return getShardInfo().status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
type = in.readString();
id = in.readString();
version = in.readZLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(type);
out.writeString(id);
out.writeZLong(version);
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
ReplicationResponse.ShardInfo shardInfo = getShardInfo();
builder.field(Fields._INDEX, shardId.getIndex())
.field(Fields._TYPE, type)
.field(Fields._ID, id)
.field(Fields._VERSION, version);
shardInfo.toXContent(builder, params);
return builder;
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
@ -30,25 +29,23 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Collections;
/**
* Base class for write action responses.
*/
public class ActionWriteResponse extends ActionResponse {
public class ReplicationResponse extends ActionResponse {
public final static ActionWriteResponse.ShardInfo.Failure[] EMPTY = new ActionWriteResponse.ShardInfo.Failure[0];
public final static ReplicationResponse.ShardInfo.Failure[] EMPTY = new ReplicationResponse.ShardInfo.Failure[0];
private ShardInfo shardInfo;
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardInfo = ActionWriteResponse.ShardInfo.readShardInfo(in);
shardInfo = ReplicationResponse.ShardInfo.readShardInfo(in);
}
@Override

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportBroadcastReplicationAction;
@ -36,7 +36,7 @@ import java.util.List;
/**
* Flush Action.
*/
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ActionWriteResponse> {
public class TransportFlushAction extends TransportBroadcastReplicationAction<FlushRequest, FlushResponse, ShardFlushRequest, ReplicationResponse> {
@Inject
public TransportFlushAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
@ -47,8 +47,8 @@ public class TransportFlushAction extends TransportBroadcastReplicationAction<Fl
}
@Override
protected ActionWriteResponse newShardResponse() {
return new ActionWriteResponse();
protected ReplicationResponse newShardResponse() {
return new ReplicationResponse();
}
@Override

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.flush;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.ClusterService;
@ -39,7 +39,7 @@ import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ActionWriteResponse> {
public class TransportShardFlushAction extends TransportReplicationAction<ShardFlushRequest, ShardFlushRequest, ReplicationResponse> {
public static final String NAME = FlushAction.NAME + "[s]";
@ -53,16 +53,16 @@ public class TransportShardFlushAction extends TransportReplicationAction<ShardF
}
@Override
protected ActionWriteResponse newResponseInstance() {
return new ActionWriteResponse();
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}
@Override
protected Tuple<ActionWriteResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
protected Tuple<ReplicationResponse, ShardFlushRequest> shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.flush(shardRequest.getRequest());
logger.trace("{} flush request executed on primary", indexShard.shardId());
return new Tuple<>(new ActionWriteResponse(), shardRequest);
return new Tuple<>(new ReplicationResponse(), shardRequest);
}
@Override

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
@ -37,7 +37,7 @@ import java.util.List;
/**
* Refresh action.
*/
public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest, RefreshResponse, ReplicationRequest, ActionWriteResponse> {
public class TransportRefreshAction extends TransportBroadcastReplicationAction<RefreshRequest, RefreshResponse, ReplicationRequest, ReplicationResponse> {
@Inject
public TransportRefreshAction(Settings settings, ThreadPool threadPool, ClusterService clusterService,
@ -48,8 +48,8 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
}
@Override
protected ActionWriteResponse newShardResponse() {
return new ActionWriteResponse();
protected ReplicationResponse newShardResponse() {
return new ReplicationResponse();
}
@Override

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.refresh;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
@ -41,7 +41,7 @@ import org.elasticsearch.transport.TransportService;
/**
*
*/
public class TransportShardRefreshAction extends TransportReplicationAction<ReplicationRequest, ReplicationRequest, ActionWriteResponse> {
public class TransportShardRefreshAction extends TransportReplicationAction<ReplicationRequest, ReplicationRequest, ReplicationResponse> {
public static final String NAME = RefreshAction.NAME + "[s]";
@ -55,16 +55,16 @@ public class TransportShardRefreshAction extends TransportReplicationAction<Repl
}
@Override
protected ActionWriteResponse newResponseInstance() {
return new ActionWriteResponse();
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}
@Override
protected Tuple<ActionWriteResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
protected Tuple<ReplicationResponse, ReplicationRequest> shardOperationOnPrimary(MetaData metaData, ReplicationRequest shardRequest) throws Throwable {
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id());
indexShard.refresh("api");
logger.trace("{} refresh request executed on primary", indexShard.shardId());
return new Tuple<>(new ActionWriteResponse(), shardRequest);
return new Tuple<>(new ReplicationResponse(), shardRequest);
}
@Override

View File

@ -19,14 +19,18 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -35,7 +39,39 @@ import java.io.IOException;
* Represents a single item response for an action executed as part of the bulk API. Holds the index/type/id
* of the relevant action, and if it has failed or not (with the failure message incase it failed).
*/
public class BulkItemResponse implements Streamable {
public class BulkItemResponse implements Streamable, StatusToXContent {
@Override
public RestStatus status() {
return failure == null ? response.status() : failure.getStatus();
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(opType);
if (failure == null) {
response.toXContent(builder, params);
builder.field(Fields.STATUS, response.status());
} else {
builder.field(Fields._INDEX, failure.getIndex());
builder.field(Fields._TYPE, failure.getType());
builder.field(Fields._ID, failure.getId());
builder.field(Fields.STATUS, failure.getStatus());
builder.startObject(Fields.ERROR);
ElasticsearchException.toXContent(builder, params, failure.getCause());
builder.endObject();
}
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString STATUS = new XContentBuilderString("status");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
}
/**
* Represents a failure.
@ -99,7 +135,7 @@ public class BulkItemResponse implements Streamable {
private String opType;
private ActionWriteResponse response;
private DocWriteResponse response;
private Failure failure;
@ -107,7 +143,7 @@ public class BulkItemResponse implements Streamable {
}
public BulkItemResponse(int id, String opType, ActionWriteResponse response) {
public BulkItemResponse(int id, String opType, DocWriteResponse response) {
this.id = id;
this.opType = opType;
this.response = response;
@ -140,14 +176,7 @@ public class BulkItemResponse implements Streamable {
if (failure != null) {
return failure.getIndex();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).getIndex();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getIndex();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getIndex();
}
return null;
return response.getIndex();
}
/**
@ -157,14 +186,7 @@ public class BulkItemResponse implements Streamable {
if (failure != null) {
return failure.getType();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).getType();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getType();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getType();
}
return null;
return response.getType();
}
/**
@ -174,14 +196,7 @@ public class BulkItemResponse implements Streamable {
if (failure != null) {
return failure.getId();
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).getId();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getId();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getId();
}
return null;
return response.getId();
}
/**
@ -191,21 +206,14 @@ public class BulkItemResponse implements Streamable {
if (failure != null) {
return -1;
}
if (response instanceof IndexResponse) {
return ((IndexResponse) response).getVersion();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getVersion();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getVersion();
}
return -1;
return response.getVersion();
}
/**
* The actual response ({@link IndexResponse} or {@link DeleteResponse}). <tt>null</tt> in
* case of failure.
*/
public <T extends ActionWriteResponse> T getResponse() {
public <T extends DocWriteResponse> T getResponse() {
return (T) response;
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;
@ -29,7 +29,7 @@ import java.io.IOException;
/**
*
*/
public class BulkShardResponse extends ActionWriteResponse {
public class BulkShardResponse extends ReplicationResponse {
private ShardId shardId;
private BulkItemResponse[] responses;

View File

@ -204,7 +204,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
@ -216,7 +216,7 @@ public class TransportShardBulkAction extends TransportReplicationAction<BulkSha
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
DeleteResponse response = writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);

View File

@ -19,9 +19,13 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -31,53 +35,19 @@ import java.io.IOException;
* @see org.elasticsearch.action.delete.DeleteRequest
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
*/
public class DeleteResponse extends ActionWriteResponse {
public class DeleteResponse extends DocWriteResponse {
private String index;
private String id;
private String type;
private long version;
private boolean found;
public DeleteResponse() {
}
public DeleteResponse(String index, String type, String id, long version, boolean found) {
this.index = index;
this.id = id;
this.type = type;
this.version = version;
public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) {
super(shardId, type, id, version);
this.found = found;
}
/**
* The index the document was deleted from.
*/
public String getIndex() {
return this.index;
}
/**
* The type of the document deleted.
*/
public String getType() {
return this.type;
}
/**
* The id of the document deleted.
*/
public String getId() {
return this.id;
}
/**
* The version of the delete operation.
*/
public long getVersion() {
return this.version;
}
/**
* Returns <tt>true</tt> if a doc was found to delete.
@ -89,20 +59,44 @@ public class DeleteResponse extends ActionWriteResponse {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
found = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(found);
}
@Override
public RestStatus status() {
if (found == false) {
return RestStatus.NOT_FOUND;
}
return super.status();
}
static final class Fields {
static final XContentBuilderString FOUND = new XContentBuilderString("found");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.FOUND, isFound());
super.toXContent(builder, params);
return builder;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("DeleteResponse[");
builder.append("index=").append(getIndex());
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",found=").append(found);
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
}

View File

@ -140,7 +140,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
assert request.versionType().validateVersionForWrites(request.version());
return new WriteResult<>(
new DeleteResponse(indexShard.shardId().getIndex(), request.type(), request.id(), delete.version(), delete.found()),
new DeleteResponse(indexShard.shardId(), request.type(), request.id(), delete.version(), delete.found()),
delete.getTranslogLocation());
}

View File

@ -19,9 +19,13 @@
package org.elasticsearch.action.index;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -31,54 +35,19 @@ import java.io.IOException;
* @see org.elasticsearch.action.index.IndexRequest
* @see org.elasticsearch.client.Client#index(IndexRequest)
*/
public class IndexResponse extends ActionWriteResponse {
public class IndexResponse extends DocWriteResponse {
private String index;
private String id;
private String type;
private long version;
private boolean created;
public IndexResponse() {
}
public IndexResponse(String index, String type, String id, long version, boolean created) {
this.index = index;
this.id = id;
this.type = type;
this.version = version;
public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) {
super(shardId, type, id, version);
this.created = created;
}
/**
* The index the document was indexed into.
*/
public String getIndex() {
return this.index;
}
/**
* The type of the document indexed.
*/
public String getType() {
return this.type;
}
/**
* The id of the document indexed.
*/
public String getId() {
return this.id;
}
/**
* Returns the current version of the doc indexed.
*/
public long getVersion() {
return this.version;
}
/**
* Returns true if the document was created, false if updated.
*/
@ -86,23 +55,23 @@ public class IndexResponse extends ActionWriteResponse {
return this.created;
}
@Override
public RestStatus status() {
if (created) {
return RestStatus.CREATED;
}
return super.status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
created = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(created);
}
@ -110,12 +79,23 @@ public class IndexResponse extends ActionWriteResponse {
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("IndexResponse[");
builder.append("index=").append(index);
builder.append(",type=").append(type);
builder.append(",id=").append(id);
builder.append(",version=").append(version);
builder.append("index=").append(getIndex());
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",created=").append(created);
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
static final class Fields {
static final XContentBuilderString CREATED = new XContentBuilderString("created");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field(Fields.CREATED, isCreated());
return builder;
}
}

View File

@ -222,7 +222,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
assert request.versionType().validateVersionForWrites(request.version());
return new WriteResult<>(new IndexResponse(shardId.getIndex(), request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
return new WriteResult<>(new IndexResponse(shardId, request.type(), request.id(), request.version(), created), operation.getTranslogLocation());
}
}

View File

@ -22,9 +22,8 @@ package org.elasticsearch.action.support.replication;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.HandledTransportAction;
@ -53,7 +52,7 @@ import java.util.function.Supplier;
* Base class for requests that should be executed on all shards of an index or several indices.
* This action sends shard requests to all primary shards of the indices and they are then replicated like write requests
*/
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, ShardResponse extends ActionWriteResponse> extends HandledTransportAction<Request, Response> {
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, ShardResponse extends ReplicationResponse> extends HandledTransportAction<Request, Response> {
private final TransportReplicationAction replicatedBroadcastShardAction;
private final ClusterService clusterService;
@ -91,15 +90,15 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
logger.trace("{}: got failure from {}", actionName, shardId);
int totalNumCopies = clusterState.getMetaData().index(shardId.index().getName()).getNumberOfReplicas() + 1;
ShardResponse shardResponse = newShardResponse();
ActionWriteResponse.ShardInfo.Failure[] failures;
ReplicationResponse.ShardInfo.Failure[] failures;
if (TransportActions.isShardNotAvailableException(e)) {
failures = new ActionWriteResponse.ShardInfo.Failure[0];
failures = new ReplicationResponse.ShardInfo.Failure[0];
} else {
ActionWriteResponse.ShardInfo.Failure failure = new ActionWriteResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
failures = new ActionWriteResponse.ShardInfo.Failure[totalNumCopies];
ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId.index().name(), shardId.id(), null, e, ExceptionsHelper.status(e), true);
failures = new ReplicationResponse.ShardInfo.Failure[totalNumCopies];
Arrays.fill(failures, failure);
}
shardResponse.setShardInfo(new ActionWriteResponse.ShardInfo(totalNumCopies, 0, failures));
shardResponse.setShardInfo(new ReplicationResponse.ShardInfo(totalNumCopies, 0, failures));
shardsResponses.add(shardResponse);
if (responsesCountDown.countDown()) {
finishAndNotifyListener(listener, shardsResponses);
@ -142,7 +141,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
int totalNumCopies = 0;
List<ShardOperationFailedException> shardFailures = null;
for (int i = 0; i < shardsResponses.size(); i++) {
ActionWriteResponse shardResponse = shardsResponses.get(i);
ReplicationResponse shardResponse = shardsResponses.get(i);
if (shardResponse == null) {
// non active shard, ignore
} else {
@ -152,7 +151,7 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
if (shardFailures == null) {
shardFailures = new ArrayList<>();
}
for (ActionWriteResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(new ShardId(failure.index(), failure.shardId()), failure.getCause())));
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilters;
@ -78,7 +78,7 @@ import java.util.function.Supplier;
* primary node to validate request before primary operation followed by sampling state again for resolving
* nodes with replica copies to perform replication.
*/
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ActionWriteResponse> extends TransportAction<Request, Response> {
public abstract class TransportReplicationAction<Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, Response extends ReplicationResponse> extends TransportAction<Request, Response> {
public static final String SHARD_FAILURE_TIMEOUT = "action.support.replication.shard.failure_timeout";
@ -214,7 +214,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
return false;
}
protected static class WriteResult<T extends ActionWriteResponse> {
protected static class WriteResult<T extends ReplicationResponse> {
public final T response;
public final Translog.Location location;
@ -225,10 +225,10 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
}
@SuppressWarnings("unchecked")
public <T extends ActionWriteResponse> T response() {
public <T extends ReplicationResponse> T response() {
// this sets total, pending and failed to 0 and this is ok, because we will embed this into the replica
// request and not use it
response.setShardInfo(new ActionWriteResponse.ShardInfo());
response.setShardInfo(new ReplicationResponse.ShardInfo());
return (T) response;
}
@ -908,20 +908,20 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private void doFinish() {
if (finished.compareAndSet(false, true)) {
Releasables.close(indexShardReference);
final ActionWriteResponse.ShardInfo.Failure[] failuresArray;
final ReplicationResponse.ShardInfo.Failure[] failuresArray;
if (!shardReplicaFailures.isEmpty()) {
int slot = 0;
failuresArray = new ActionWriteResponse.ShardInfo.Failure[shardReplicaFailures.size()];
failuresArray = new ReplicationResponse.ShardInfo.Failure[shardReplicaFailures.size()];
for (Map.Entry<String, Throwable> entry : shardReplicaFailures.entrySet()) {
RestStatus restStatus = ExceptionsHelper.status(entry.getValue());
failuresArray[slot++] = new ActionWriteResponse.ShardInfo.Failure(
failuresArray[slot++] = new ReplicationResponse.ShardInfo.Failure(
shardId.getIndex(), shardId.getId(), entry.getKey(), entry.getValue(), restStatus, false
);
}
} else {
failuresArray = ActionWriteResponse.EMPTY;
failuresArray = ReplicationResponse.EMPTY;
}
finalResponse.setShardInfo(new ActionWriteResponse.ShardInfo(
finalResponse.setShardInfo(new ReplicationResponse.ShardInfo(
totalShards,
success.get(),
failuresArray

View File

@ -175,7 +175,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
if (request.fields() != null && request.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
@ -212,7 +212,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
listener.onResponse(update);
}
@ -240,7 +240,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getIndex(), response.getType(), response.getId(), response.getVersion(), false);
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
listener.onResponse(update);
}

View File

@ -83,9 +83,10 @@ public class UpdateHelper extends AbstractComponent {
@SuppressWarnings("unchecked")
protected Result prepare(UpdateRequest request, final GetResult getResult) {
long getDateNS = System.nanoTime();
final ShardId shardId = new ShardId(getResult.getIndex(), request.shardId());
if (!getResult.isExists()) {
if (request.upsertRequest() == null && !request.docAsUpsert()) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
throw new DocumentMissingException(shardId, request.type(), request.id());
}
IndexRequest indexRequest = request.docAsUpsert() ? request.doc() : request.upsertRequest();
TimeValue ttl = indexRequest.ttl();
@ -113,7 +114,7 @@ public class UpdateHelper extends AbstractComponent {
logger.warn("Used upsert operation [{}] for script [{}], doing nothing...", scriptOpChoice,
request.script.getScript());
}
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(),
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
getResult.getVersion(), false);
update.setGetResult(getResult);
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
@ -145,7 +146,7 @@ public class UpdateHelper extends AbstractComponent {
if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
throw new DocumentSourceMissingException(shardId, request.type(), request.id());
}
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
@ -231,12 +232,12 @@ public class UpdateHelper extends AbstractComponent {
.consistencyLevel(request.consistencyLevel());
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript());
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion(), false);
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
}
}

View File

@ -19,21 +19,21 @@
package org.elasticsearch.action.update;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
*/
public class UpdateResponse extends ActionWriteResponse {
public class UpdateResponse extends DocWriteResponse {
private String index;
private String id;
private String type;
private long version;
private boolean created;
private GetResult getResult;
@ -44,47 +44,16 @@ public class UpdateResponse extends ActionWriteResponse {
* Constructor to be used when a update didn't translate in a write.
* For example: update script with operation set to none
*/
public UpdateResponse(String index, String type, String id, long version, boolean created) {
this(new ShardInfo(0, 0), index, type, id, version, created);
public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) {
this(new ShardInfo(0, 0), shardId, type, id, version, created);
}
public UpdateResponse(ShardInfo shardInfo, String index, String type, String id, long version, boolean created) {
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long version, boolean created) {
super(shardId, type, id, version);
setShardInfo(shardInfo);
this.index = index;
this.id = id;
this.type = type;
this.version = version;
this.created = created;
}
/**
* The index the document was indexed into.
*/
public String getIndex() {
return this.index;
}
/**
* The type of the document indexed.
*/
public String getType() {
return this.type;
}
/**
* The id of the document indexed.
*/
public String getId() {
return this.id;
}
/**
* Returns the current version of the doc indexed.
*/
public long getVersion() {
return this.version;
}
public void setGetResult(GetResult getResult) {
this.getResult = getResult;
}
@ -101,13 +70,17 @@ public class UpdateResponse extends ActionWriteResponse {
}
@Override
public RestStatus status() {
if (created) {
return RestStatus.CREATED;
}
return super.status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
type = in.readString();
id = in.readString();
version = in.readLong();
created = in.readBoolean();
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
@ -117,10 +90,6 @@ public class UpdateResponse extends ActionWriteResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeString(type);
out.writeString(id);
out.writeLong(version);
out.writeBoolean(created);
if (getResult == null) {
out.writeBoolean(false);
@ -129,4 +98,34 @@ public class UpdateResponse extends ActionWriteResponse {
getResult.writeTo(out);
}
}
static final class Fields {
static final XContentBuilderString GET = new XContentBuilderString("get");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
if (getGetResult() != null) {
builder.startObject(Fields.GET);
getGetResult().toXContentEmbedded(builder, params);
builder.endObject();
}
return builder;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
builder.append("UpdateResponse[");
builder.append("index=").append(getIndex());
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",created=").append(created);
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
}

View File

@ -19,16 +19,11 @@
package org.elasticsearch.rest.action.bulk;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.BulkShardRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
@ -96,52 +91,7 @@ public class RestBulkAction extends BaseRestHandler {
builder.startArray(Fields.ITEMS);
for (BulkItemResponse itemResponse : response) {
builder.startObject();
builder.startObject(itemResponse.getOpType());
builder.field(Fields._INDEX, itemResponse.getIndex());
builder.field(Fields._TYPE, itemResponse.getType());
builder.field(Fields._ID, itemResponse.getId());
long version = itemResponse.getVersion();
if (version != -1) {
builder.field(Fields._VERSION, itemResponse.getVersion());
}
if (itemResponse.isFailed()) {
builder.field(Fields.STATUS, itemResponse.getFailure().getStatus().getStatus());
builder.startObject(Fields.ERROR);
ElasticsearchException.toXContent(builder, request, itemResponse.getFailure().getCause());
builder.endObject();
} else {
ActionWriteResponse.ShardInfo shardInfo = itemResponse.getResponse().getShardInfo();
shardInfo.toXContent(builder, request);
if (itemResponse.getResponse() instanceof DeleteResponse) {
DeleteResponse deleteResponse = itemResponse.getResponse();
if (deleteResponse.isFound()) {
builder.field(Fields.STATUS, shardInfo.status().getStatus());
} else {
builder.field(Fields.STATUS, RestStatus.NOT_FOUND.getStatus());
}
builder.field(Fields.FOUND, deleteResponse.isFound());
} else if (itemResponse.getResponse() instanceof IndexResponse) {
IndexResponse indexResponse = itemResponse.getResponse();
if (indexResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else {
builder.field(Fields.STATUS, shardInfo.status().getStatus());
}
} else if (itemResponse.getResponse() instanceof UpdateResponse) {
UpdateResponse updateResponse = itemResponse.getResponse();
if (updateResponse.isCreated()) {
builder.field(Fields.STATUS, RestStatus.CREATED.getStatus());
} else {
builder.field(Fields.STATUS, shardInfo.status().getStatus());
}
if (updateResponse.getGetResult() != null) {
builder.startObject(Fields.GET);
updateResponse.getGetResult().toXContentEmbedded(builder, request);
builder.endObject();
}
}
}
builder.endObject();
itemResponse.toXContent(builder, request);
builder.endObject();
}
builder.endArray();
@ -155,15 +105,7 @@ public class RestBulkAction extends BaseRestHandler {
static final class Fields {
static final XContentBuilderString ITEMS = new XContentBuilderString("items");
static final XContentBuilderString ERRORS = new XContentBuilderString("errors");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString STATUS = new XContentBuilderString("status");
static final XContentBuilderString ERROR = new XContentBuilderString("error");
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString GET = new XContentBuilderString("get");
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.delete;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
@ -27,14 +26,13 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
/**
*
@ -62,31 +60,6 @@ public class RestDeleteAction extends BaseRestHandler {
deleteRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
client.delete(deleteRequest, new RestBuilderListener<DeleteResponse>(channel) {
@Override
public RestResponse buildResponse(DeleteResponse result, XContentBuilder builder) throws Exception {
ActionWriteResponse.ShardInfo shardInfo = result.getShardInfo();
builder.startObject().field(Fields.FOUND, result.isFound())
.field(Fields._INDEX, result.getIndex())
.field(Fields._TYPE, result.getType())
.field(Fields._ID, result.getId())
.field(Fields._VERSION, result.getVersion())
.value(shardInfo)
.endObject();
RestStatus status = shardInfo.status();
if (!result.isFound()) {
status = NOT_FOUND;
}
return new BytesRestResponse(status, builder);
}
});
}
static final class Fields {
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
client.delete(deleteRequest, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.index;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
@ -27,11 +26,11 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import java.io.IOException;
@ -99,33 +98,6 @@ public class RestIndexAction extends BaseRestHandler {
if (consistencyLevel != null) {
indexRequest.consistencyLevel(WriteConsistencyLevel.fromString(consistencyLevel));
}
client.index(indexRequest, new RestBuilderListener<IndexResponse>(channel) {
@Override
public RestResponse buildResponse(IndexResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
builder.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion());
shardInfo.toXContent(builder, request);
builder.field(Fields.CREATED, response.isCreated());
builder.endObject();
RestStatus status = shardInfo.status();
if (response.isCreated()) {
status = CREATED;
client.index(indexRequest, new RestStatusToXContentListener<>(channel));
}
return new BytesRestResponse(status, builder);
}
});
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString CREATED = new XContentBuilderString("created");
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.update;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
@ -29,7 +28,6 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
@ -40,6 +38,7 @@ import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener;
import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptParameterParser;
import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue;
@ -48,7 +47,6 @@ import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.CREATED;
/**
*/
@ -123,38 +121,6 @@ public class RestUpdateAction extends BaseRestHandler {
}
}
client.update(updateRequest, new RestBuilderListener<UpdateResponse>(channel) {
@Override
public RestResponse buildResponse(UpdateResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
builder.field(Fields._INDEX, response.getIndex())
.field(Fields._TYPE, response.getType())
.field(Fields._ID, response.getId())
.field(Fields._VERSION, response.getVersion());
shardInfo.toXContent(builder, request);
if (response.getGetResult() != null) {
builder.startObject(Fields.GET);
response.getGetResult().toXContentEmbedded(builder, request);
builder.endObject();
}
builder.endObject();
RestStatus status = shardInfo.status();
if (response.isCreated()) {
status = CREATED;
}
return new BytesRestResponse(status, builder);
}
});
}
static final class Fields {
static final XContentBuilderString _INDEX = new XContentBuilderString("_index");
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString _ID = new XContentBuilderString("_id");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString GET = new XContentBuilderString("get");
client.update(updateRequest, new RestStatusToXContentListener<>(channel));
}
}

View File

@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
@ -101,7 +101,7 @@ public class BroadcastReplicationTests extends ESTestCase {
randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.UNASSIGNED, ShardRoutingState.UNASSIGNED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
if (randomBoolean()) {
shardRequests.v2().onFailure(new NoShardAvailableActionException(shardRequests.v1()));
} else {
@ -120,10 +120,10 @@ public class BroadcastReplicationTests extends ESTestCase {
ShardRoutingState.STARTED));
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(1, 1, new ActionWriteResponse.ShardInfo.Failure[0]));
shardRequests.v2().onResponse(actionWriteResponse);
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
ReplicationResponse replicationResponse = new ReplicationResponse();
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(1, 1, new ReplicationResponse.ShardInfo.Failure[0]));
shardRequests.v2().onResponse(replicationResponse);
}
logger.info("total shards: {}, ", response.get().getTotalShards());
assertBroadcastResponse(1, 1, 0, response.get(), null);
@ -137,20 +137,20 @@ public class BroadcastReplicationTests extends ESTestCase {
Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new BroadcastRequest().indices(index)));
int succeeded = 0;
int failed = 0;
for (Tuple<ShardId, ActionListener<ActionWriteResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
for (Tuple<ShardId, ActionListener<ReplicationResponse>> shardRequests : broadcastReplicationAction.capturedShardRequests) {
if (randomBoolean()) {
ActionWriteResponse.ShardInfo.Failure[] failures = new ActionWriteResponse.ShardInfo.Failure[0];
ReplicationResponse.ShardInfo.Failure[] failures = new ReplicationResponse.ShardInfo.Failure[0];
int shardsSucceeded = randomInt(1) + 1;
succeeded += shardsSucceeded;
ActionWriteResponse actionWriteResponse = new ActionWriteResponse();
ReplicationResponse replicationResponse = new ReplicationResponse();
if (shardsSucceeded == 1 && randomBoolean()) {
//sometimes add failure (no failure means shard unavailable)
failures = new ActionWriteResponse.ShardInfo.Failure[1];
failures[0] = new ActionWriteResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
failures = new ReplicationResponse.ShardInfo.Failure[1];
failures[0] = new ReplicationResponse.ShardInfo.Failure(index, shardRequests.v1().id(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
failed++;
}
actionWriteResponse.setShardInfo(new ActionWriteResponse.ShardInfo(2, shardsSucceeded, failures));
shardRequests.v2().onResponse(actionWriteResponse);
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures));
shardRequests.v2().onResponse(replicationResponse);
} else {
// sometimes fail
failed += 2;
@ -179,16 +179,16 @@ public class BroadcastReplicationTests extends ESTestCase {
assertThat(shards.get(0), equalTo(shardId));
}
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<BroadcastRequest, BroadcastResponse, ReplicationRequest, ActionWriteResponse> {
protected final Set<Tuple<ShardId, ActionListener<ActionWriteResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<BroadcastRequest, BroadcastResponse, ReplicationRequest, ReplicationResponse> {
protected final Set<Tuple<ShardId, ActionListener<ReplicationResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
public TestBroadcastReplicationAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
super("test-broadcast-replication-action", BroadcastRequest::new, settings, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction);
}
@Override
protected ActionWriteResponse newShardResponse() {
return new ActionWriteResponse();
protected ReplicationResponse newShardResponse() {
return new ReplicationResponse();
}
@Override
@ -202,7 +202,7 @@ public class BroadcastReplicationTests extends ESTestCase {
}
@Override
protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener<ActionWriteResponse> shardActionListener) {
protected void shardExecute(BroadcastRequest request, ShardId shardId, ActionListener<ReplicationResponse> shardActionListener) {
capturedShardRequests.add(new Tuple<>(shardId, shardActionListener));
}
}

View File

@ -20,7 +20,7 @@ package org.elasticsearch.action.support.replication;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.ActionFilter;
@ -521,7 +521,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
assertThat(listener.isDone(), equalTo(true));
Response response = listener.get();
final ActionWriteResponse.ShardInfo shardInfo = response.getShardInfo();
final ReplicationResponse.ShardInfo shardInfo = response.getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(criticalFailures));
assertThat(shardInfo.getFailures(), arrayWithSize(criticalFailures));
assertThat(shardInfo.getSuccessful(), equalTo(successful));
@ -703,7 +703,7 @@ public class TransportReplicationActionTests extends ESTestCase {
}
}
static class Response extends ActionWriteResponse {
static class Response extends ReplicationResponse {
}
class Action extends TransportReplicationAction<Request, Request, Response> {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.document;
import org.elasticsearch.action.ActionWriteResponse;
import org.elasticsearch.action.ReplicationResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
@ -117,11 +117,11 @@ public class ShardInfoIT extends ESIntegTestCase {
}
}
private void assertShardInfo(ActionWriteResponse response) {
private void assertShardInfo(ReplicationResponse response) {
assertShardInfo(response, numCopies, numNodes);
}
private void assertShardInfo(ActionWriteResponse response, int expectedTotal, int expectedSuccessful) {
private void assertShardInfo(ReplicationResponse response, int expectedTotal, int expectedSuccessful) {
assertThat(response.getShardInfo().getTotal(), greaterThanOrEqualTo(expectedTotal));
assertThat(response.getShardInfo().getSuccessful(), greaterThanOrEqualTo(expectedSuccessful));
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit;
@ -225,7 +226,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase {
} else {
deleted++;
}
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test", "type", String.valueOf(i), 1, delete));
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test", 0), "type", String.valueOf(i), 1, delete));
} else {
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed")));
failed++;
@ -281,7 +282,7 @@ public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase {
deleted[0] = deleted[0] + 1;
deleted[index] = deleted[index] + 1;
}
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test-" + index, "type", String.valueOf(i), 1, delete));
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test-" + index, 0), "type", String.valueOf(i), 1, delete));
} else {
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed")));
failed[0] = failed[0] + 1;