Add _operation field to index, update, delete responses

Performing the bulk request shown in  now results in the following:
```
{"_index":"test","_type":"test","_id":"1","_version":1,"_operation":"create","forced_refresh":false,"_shards":{"total":2,"successful":1,"failed":0},"status":201}
{"_index":"test","_type":"test","_id":"1","_version":1,"_operation":"noop","forced_refresh":false,"_shards":{"total":2,"successful":1,"failed":0},"status":200}
```
This commit is contained in:
Alexander Lin 2016-07-26 08:16:19 -07:00 committed by Nik Everett
parent b208a7dbae
commit 8f2882a442
15 changed files with 224 additions and 100 deletions
core/src/main/java/org/elasticsearch/action
docs/reference/docs
modules/reindex/src/test/java/org/elasticsearch/index/reindex
rest-api-spec/src/main/resources/rest-api-spec/test

@ -25,6 +25,7 @@ import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexSettings;
@ -32,29 +33,83 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Locale;
/**
* A base class for the response of a write operation that involves a single doc
*/
public abstract class DocWriteResponse extends ReplicationResponse implements WriteResponse, StatusToXContent {
public enum Operation implements Writeable {
CREATE(0),
INDEX(1),
DELETE(2),
NOOP(3);
private final byte op;
private final String lowercase;
Operation(int op) {
this.op = (byte) op;
this.lowercase = this.toString().toLowerCase(Locale.ENGLISH);
}
public byte getOp() {
return op;
}
public String getLowercase() {
return lowercase;
}
public static Operation readFrom(StreamInput in) throws IOException{
Byte opcode = in.readByte();
switch(opcode){
case 0:
return CREATE;
case 1:
return INDEX;
case 2:
return DELETE;
case 3:
return NOOP;
default:
throw new IllegalArgumentException("Unknown operation code: " + opcode);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeByte(op);
}
}
private ShardId shardId;
private String id;
private String type;
private long version;
private boolean forcedRefresh;
protected Operation operation;
public DocWriteResponse(ShardId shardId, String type, String id, long version) {
public DocWriteResponse(ShardId shardId, String type, String id, long version, Operation operation) {
this.shardId = shardId;
this.type = type;
this.id = id;
this.version = version;
this.operation = operation;
}
// needed for deserialization
protected DocWriteResponse() {
}
/**
* The change that occurred to the document.
*/
public Operation getOperation() {
return operation;
}
/**
* The index the document was changed in.
*/
@ -143,6 +198,7 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
id = in.readString();
version = in.readZLong();
forcedRefresh = in.readBoolean();
operation = Operation.readFrom(in);
}
@Override
@ -153,22 +209,17 @@ public abstract class DocWriteResponse extends ReplicationResponse implements Wr
out.writeString(id);
out.writeZLong(version);
out.writeBoolean(forcedRefresh);
}
static final class Fields {
static final String _INDEX = "_index";
static final String _TYPE = "_type";
static final String _ID = "_id";
static final String _VERSION = "_version";
operation.writeTo(out);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
ReplicationResponse.ShardInfo shardInfo = getShardInfo();
builder.field(Fields._INDEX, shardId.getIndexName())
.field(Fields._TYPE, type)
.field(Fields._ID, id)
.field(Fields._VERSION, version)
builder.field("_index", shardId.getIndexName())
.field("_type", type)
.field("_id", id)
.field("_version", version)
.field("_operation", getOperation().getLowercase())
.field("forced_refresh", forcedRefresh);
shardInfo.toXContent(builder, params);
return builder;

@ -248,7 +248,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
IndexResponse indexResponse = result.getResponse();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.isCreated());
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getShardInfo(), indexResponse.getShardId(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion(), indexResponse.getOperation());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
@ -261,7 +261,7 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
WriteResult<DeleteResponse> writeResult = updateResult.writeResult;
DeleteResponse response = writeResult.getResponse();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
updateResponse = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getOperation());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, request.index(), response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
// Replace the update request to the translated delete request to execute on the replica.
item = request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);

@ -20,8 +20,6 @@
package org.elasticsearch.action.delete;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
@ -36,52 +34,29 @@ import java.io.IOException;
*/
public class DeleteResponse extends DocWriteResponse {
private boolean found;
public DeleteResponse() {
}
public DeleteResponse(ShardId shardId, String type, String id, long version, boolean found) {
super(shardId, type, id, version);
this.found = found;
super(shardId, type, id, version, found ? Operation.DELETE : Operation.NOOP);
}
/**
* Returns <tt>true</tt> if a doc was found to delete.
*/
public boolean isFound() {
return found;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
found = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(found);
return operation == Operation.DELETE;
}
@Override
public RestStatus status() {
if (found == false) {
return RestStatus.NOT_FOUND;
}
return super.status();
}
static final class Fields {
static final String FOUND = "found";
return isFound() ? super.status() : RestStatus.NOT_FOUND;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.FOUND, isFound());
builder.field("found", isFound());
super.toXContent(builder, params);
return builder;
}
@ -94,7 +69,7 @@ public class DeleteResponse extends DocWriteResponse {
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",found=").append(found);
builder.append(",operation=").append(getOperation().getLowercase());
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}

@ -36,42 +36,24 @@ import java.io.IOException;
*/
public class IndexResponse extends DocWriteResponse {
private boolean created;
public IndexResponse() {
}
public IndexResponse(ShardId shardId, String type, String id, long version, boolean created) {
super(shardId, type, id, version);
this.created = created;
super(shardId, type, id, version, created ? Operation.CREATE : Operation.INDEX);
}
/**
* Returns true if the document was created, false if updated.
*/
public boolean isCreated() {
return this.created;
return this.operation == Operation.CREATE;
}
@Override
public RestStatus status() {
if (created) {
return RestStatus.CREATED;
}
return super.status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
created = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(created);
return isCreated() ? RestStatus.CREATED : super.status();
}
@Override
@ -82,19 +64,15 @@ public class IndexResponse extends DocWriteResponse {
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",created=").append(created);
builder.append(",operation=").append(getOperation().getLowercase());
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}
static final class Fields {
static final String CREATED = "created";
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
super.toXContent(builder, params);
builder.field(Fields.CREATED, isCreated());
builder.field("created", isCreated());
return builder;
}
}

@ -22,6 +22,7 @@ package org.elasticsearch.action.update;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
@ -185,7 +186,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getOperation());
if (request.fields() != null && request.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
@ -223,7 +224,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.isCreated());
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getOperation());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);
@ -252,7 +253,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), false);
UpdateResponse update = new UpdateResponse(response.getShardInfo(), response.getShardId(), response.getType(), response.getId(), response.getVersion(), response.getOperation());
update.setGetResult(updateHelper.extractGetResult(request, request.concreteIndex(), response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
update.setForcedRefresh(response.forcedRefresh());
listener.onResponse(update);

@ -116,7 +116,7 @@ public class UpdateHelper extends AbstractComponent {
request.script.getScript());
}
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(),
getResult.getVersion(), false);
getResult.getVersion(), UpdateResponse.convert(Operation.NONE));
update.setGetResult(getResult);
return new Result(update, Operation.NONE, upsertDoc, XContentType.JSON);
}
@ -234,12 +234,12 @@ public class UpdateHelper extends AbstractComponent {
.setRefreshPolicy(request.getRefreshPolicy());
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), UpdateResponse.convert(Operation.NONE));
update.setGetResult(extractGetResult(request, request.index(), getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, getResult.internalSourceRef()));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script.getScript());
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), false);
UpdateResponse update = new UpdateResponse(shardId, getResult.getType(), getResult.getId(), getResult.getVersion(), UpdateResponse.convert(Operation.NONE));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
}
}

@ -29,11 +29,8 @@ import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
/**
*/
public class UpdateResponse extends DocWriteResponse {
private boolean created;
private GetResult getResult;
public UpdateResponse() {
@ -43,14 +40,28 @@ public class UpdateResponse extends DocWriteResponse {
* Constructor to be used when a update didn't translate in a write.
* For example: update script with operation set to none
*/
public UpdateResponse(ShardId shardId, String type, String id, long version, boolean created) {
this(new ShardInfo(0, 0), shardId, type, id, version, created);
public UpdateResponse(ShardId shardId, String type, String id, long version, Operation operation) {
this(new ShardInfo(0, 0), shardId, type, id, version, operation);
}
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id, long version, boolean created) {
super(shardId, type, id, version);
public UpdateResponse(ShardInfo shardInfo, ShardId shardId, String type, String id,
long version, Operation operation) {
super(shardId, type, id, version, operation);
setShardInfo(shardInfo);
this.created = created;
}
public static Operation convert(UpdateHelper.Operation op) {
switch(op) {
case UPSERT:
return Operation.CREATE;
case INDEX:
return Operation.INDEX;
case DELETE:
return Operation.DELETE;
case NONE:
return Operation.NOOP;
}
throw new IllegalArgumentException();
}
public void setGetResult(GetResult getResult) {
@ -65,22 +76,17 @@ public class UpdateResponse extends DocWriteResponse {
* Returns true if document was created due to an UPSERT operation
*/
public boolean isCreated() {
return this.created;
return this.operation == Operation.CREATE;
}
@Override
public RestStatus status() {
if (created) {
return RestStatus.CREATED;
}
return super.status();
return isCreated() ? RestStatus.CREATED : super.status();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
created = in.readBoolean();
if (in.readBoolean()) {
getResult = GetResult.readGetResult(in);
}
@ -89,7 +95,6 @@ public class UpdateResponse extends DocWriteResponse {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(created);
if (getResult == null) {
out.writeBoolean(false);
} else {
@ -122,7 +127,7 @@ public class UpdateResponse extends DocWriteResponse {
builder.append(",type=").append(getType());
builder.append(",id=").append(getId());
builder.append(",version=").append(getVersion());
builder.append(",created=").append(created);
builder.append(",operation=").append(getOperation().getLowercase());
builder.append(",shards=").append(getShardInfo());
return builder.append("]").toString();
}

@ -57,7 +57,7 @@ $ cat requests
{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }
{ "field1" : "value1" }
$ curl -s -XPOST localhost:9200/_bulk --data-binary "@requests"; echo
{"took":7,"items":[{"create":{"_index":"test","_type":"type1","_id":"1","_version":1}}]}
{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"type1","_id":"1","_version":1,"_operation":"create","forced_refresh":false}}]}
--------------------------------------------------
Because this format uses literal `\n`'s as delimiters, please be sure

@ -25,7 +25,8 @@ The result of the above delete operation is:
"_index" : "twitter",
"_type" : "tweet",
"_id" : "1",
"_version" : 2
"_version" : 2,
"_operation: delete"
}
--------------------------------------------------

@ -31,6 +31,7 @@ The result of the above index operation is:
"_id" : "1",
"_version" : 1,
"created" : true,
"_operation" : create,
"forced_refresh": false
}
--------------------------------------------------
@ -231,6 +232,7 @@ The result of the above index operation is:
"_id" : "6a8ca01c-7896-48e9-81cc-9f70661fcb32",
"_version" : 1,
"created" : true,
"_operation": "create",
"forced_refresh": false
}
--------------------------------------------------

@ -132,8 +132,20 @@ curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
}'
--------------------------------------------------
If `name` was `new_name` before the request was sent then document is still
reindexed.
If `name` was `new_name` before the request was sent then the entire update
request is ignored. The `operation` element in the response returns `noop` if
the request was ignored.
[source,js]
--------------------------------------------------
{
"_index": "test",
"_type": "type1",
"_id": "1",
"_version": 1,
"_operation": noop
}
--------------------------------------------------
[[upserts]]
[float]

@ -314,7 +314,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
};
ScrollableHitSource.Response response = new ScrollableHitSource.Response(false, emptyList(), 0, emptyList(), null);
simulateScrollResponse(new DummyAbstractAsyncBulkByScrollAction(), timeValueNanos(System.nanoTime()), 10, response);
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
ExecutionException e = expectThrows(ExecutionException.class, () -> listener.get());
assertThat(e.getMessage(), equalTo("EsRejectedExecutionException[test]"));
assertThat(client.scrollsCleared, contains(scrollId));
@ -773,7 +773,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
UpdateRequest update = (UpdateRequest) item;
opType = "update";
response = new UpdateResponse(shardId, update.type(), update.id(),
randomIntBetween(0, Integer.MAX_VALUE), true);
randomIntBetween(0, Integer.MAX_VALUE), DocWriteResponse.Operation.CREATE);
} else if (item instanceof DeleteRequest) {
DeleteRequest delete = (DeleteRequest) item;
opType = "delete";

@ -0,0 +1,26 @@
---
"Delete operation field":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- do:
delete:
index: test_1
type: test
id: 1
- match: { _operation: delete }
- do:
catch: missing
delete:
index: test_1
type: test
id: 1
- match: { _operation: noop }

@ -0,0 +1,21 @@
---
"Index operation field":
- do:
index:
index: test_index
type: test
id: 1
body: { foo: bar }
- match: { _operation: create }
- do:
index:
index: test_index
type: test
id: 1
body: { foo: bar }
op_type: index
- match: { _operation: index }

@ -0,0 +1,52 @@
---
"Update operation field":
- do:
update:
index: test_1
type: test
id: 1
body:
doc: { foo: bar }
doc_as_upsert: true
- match: { _version: 1 }
- match: { _operation: create }
- do:
update:
index: test_1
type: test
id: 1
body:
doc: { foo: bar }
doc_as_upsert: true
- match: { _version: 1 }
- match: { _operation: noop }
- do:
update:
index: test_1
type: test
id: 1
body:
doc: { foo: bar }
doc_as_upsert: true
detect_noop: false
- match: { _version: 2 }
- match: { _operation: index }
- do:
update:
index: test_1
type: test
id: 1
body:
doc: { foo: baz }
doc_as_upsert: true
detect_noop: true
- match: { _version: 3 }
- match: { _operation: index }