From 9ddd675a02431a72daf48da991ce7eda726f1c79 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 2 May 2013 00:12:53 +0200 Subject: [PATCH] Added support for the update operation in the bulk api. Update requests can now be put in the bulk api. All update request options are supported. Example usage: ``` curl -XPOST 'localhost:9200/_bulk' --date-binary @bulk.json ``` Contents of bulk.json that contains two update request items: ``` { "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} } { "doc" : {"field" : "value"} } { "update" : { "_id" : "0", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} } { "script" : "counter += param1", "lang" : "js", "params" : {"param1" : 1}, "upsert" : {"counter" : 1}} ``` The `doc`, `upsert` and all script related options are part of the payload. The `retry_on_conflict` option is part of the header. Closes #2982 --- .../action/bulk/BulkItemRequest.java | 5 + .../action/bulk/BulkItemResponse.java | 15 + .../action/bulk/BulkRequest.java | 43 +++ .../action/bulk/BulkRequestBuilder.java | 20 + .../action/bulk/BulkShardRequest.java | 7 +- .../action/bulk/TransportBulkAction.java | 27 ++ .../action/bulk/TransportShardBulkAction.java | 357 +++++++++++++++--- .../action/update/TransportUpdateAction.java | 355 +++++------------ .../action/update/UpdateHelper.java | 254 +++++++++++++ .../action/update/UpdateRequest.java | 6 +- .../action/update/UpdateResponse.java | 2 +- .../elasticsearch/indices/IndicesModule.java | 2 + .../test/integration/document/BulkTests.java | 310 +++++++++++++++ .../test/integration/update/UpdateTests.java | 63 ++++ .../unit/action/bulk/BulkRequestTests.java | 21 ++ .../test/unit/action/bulk/simple-bulk4.json | 7 + 16 files changed, 1181 insertions(+), 313 deletions(-) create mode 100644 src/main/java/org/elasticsearch/action/update/UpdateHelper.java create mode 100644 src/test/java/org/elasticsearch/test/integration/document/BulkTests.java create mode 100644 src/test/java/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java index 24233054c34..55cadf658f7 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkItemRequest.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -68,6 +69,8 @@ public class BulkItemRequest implements Streamable { request = new IndexRequest(); } else if (type == 1) { request = new DeleteRequest(); + } else if (type == 2) { + request = new UpdateRequest(); } request.readFrom(in); } @@ -79,6 +82,8 @@ public class BulkItemRequest implements Streamable { out.writeByte((byte) 0); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); } request.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java index b24d1c30b7c..09c3c659987 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkItemResponse.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -130,6 +131,8 @@ public class BulkItemResponse implements Streamable { return ((IndexResponse) response).getIndex(); } else if (response instanceof DeleteResponse) { return ((DeleteResponse) response).getIndex(); + } else if (response instanceof UpdateResponse) { + return ((UpdateResponse) response).getIndex(); } return null; } @@ -146,6 +149,9 @@ public class BulkItemResponse implements Streamable { } else if (response instanceof DeleteResponse) { return ((DeleteResponse) response).getType(); } + else if (response instanceof UpdateResponse) { + return ((UpdateResponse) response).getType(); + } return null; } @@ -160,6 +166,8 @@ public class BulkItemResponse implements Streamable { return ((IndexResponse) response).getId(); } else if (response instanceof DeleteResponse) { return ((DeleteResponse) response).getId(); + } else if (response instanceof UpdateResponse) { + return ((UpdateResponse) response).getId(); } return null; } @@ -175,6 +183,8 @@ public class BulkItemResponse implements Streamable { return ((IndexResponse) response).getVersion(); } else if (response instanceof DeleteResponse) { return ((DeleteResponse) response).getVersion(); + } else if (response instanceof UpdateResponse) { + return ((UpdateResponse) response).getVersion(); } return -1; } @@ -229,6 +239,9 @@ public class BulkItemResponse implements Streamable { } else if (type == 1) { response = new DeleteResponse(); response.readFrom(in); + } else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses' + response = new UpdateResponse(); + response.readFrom(in); } if (in.readBoolean()) { @@ -247,6 +260,8 @@ public class BulkItemResponse implements Streamable { out.writeByte((byte) 0); } else if (response instanceof DeleteResponse) { out.writeByte((byte) 1); + } else if (response instanceof UpdateResponse) { + out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses' } response.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 1293a699d58..e58b2dcc032 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.WriteConsistencyLevel; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -82,6 +83,8 @@ public class BulkRequest extends ActionRequest { add((IndexRequest) request, payload); } else if (request instanceof DeleteRequest) { add((DeleteRequest) request, payload); + } else if (request instanceof UpdateRequest) { + add((UpdateRequest) request, payload); } else { throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]"); } @@ -125,6 +128,33 @@ public class BulkRequest extends ActionRequest { return this; } + /** + * Adds an {@link UpdateRequest} to the list of actions to execute. + */ + public BulkRequest add(UpdateRequest request) { + request.beforeLocalFork(); + return internalAdd(request, null); + } + + public BulkRequest add(UpdateRequest request, @Nullable Object payload) { + request.beforeLocalFork(); + return internalAdd(request, payload); + } + + BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) { + requests.add(request); + addPayload(payload); + if (request.doc() != null) { + sizeInBytes += request.doc().source().length(); + } + if (request.upsertRequest() != null) { + sizeInBytes += request.upsertRequest().source().length(); + } + if (request.script() != null) { + sizeInBytes += request.script().length() * 2; + } + return this; + } /** * Adds an {@link DeleteRequest} to the list of actions to execute. */ @@ -245,6 +275,7 @@ public class BulkRequest extends ActionRequest { long version = 0; VersionType versionType = VersionType.INTERNAL; String percolate = null; + int retryOnConflict = 0; // at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id) // or START_OBJECT which will have another set of parameters @@ -280,6 +311,8 @@ public class BulkRequest extends ActionRequest { versionType = VersionType.fromString(parser.text()); } else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) { percolate = parser.textOrNull(); + } else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) { + retryOnConflict = parser.intValue(); } } } @@ -310,6 +343,10 @@ public class BulkRequest extends ActionRequest { .create(true) .source(data.slice(from, nextMarker - from), contentUnsafe) .percolate(percolate), payload); + } else if ("update".equals(action)) { + internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict) + .source(data.slice(from, nextMarker - from)) + .percolate(percolate), payload); } // move pointers from = nextMarker + 1; @@ -403,6 +440,10 @@ public class BulkRequest extends ActionRequest { DeleteRequest request = new DeleteRequest(); request.readFrom(in); requests.add(request); + } else if (type == 2) { + UpdateRequest request = new UpdateRequest(); + request.readFrom(in); + requests.add(request); } } refresh = in.readBoolean(); @@ -419,6 +460,8 @@ public class BulkRequest extends ActionRequest { out.writeByte((byte) 0); } else if (request instanceof DeleteRequest) { out.writeByte((byte) 1); + } else if (request instanceof UpdateRequest) { + out.writeByte((byte) 2); } request.writeTo(out); } diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java b/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java index 53b4b4def10..f1407c35894 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkRequestBuilder.java @@ -27,6 +27,8 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.internal.InternalClient; import org.elasticsearch.common.Nullable; @@ -75,6 +77,24 @@ public class BulkRequestBuilder extends ActionRequestBuilder list = requestsByShard.get(shardId); + if (list == null) { + list = Lists.newArrayList(); + requestsByShard.put(shardId, list); + } + list.add(new BulkItemRequest(i, request)); } } @@ -243,6 +266,10 @@ public class TransportBulkAction extends TransportAction { private final MappingUpdatedAction mappingUpdatedAction; + private final UpdateHelper updateHelper; + private final boolean allowIdGeneration; @Inject public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction, - MappingUpdatedAction mappingUpdatedAction) { + MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper) { super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction); this.mappingUpdatedAction = mappingUpdatedAction; + this.updateHelper = updateHelper; + this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true); } @Override @@ -127,72 +142,39 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation protected PrimaryResponse shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) { final BulkShardRequest request = shardRequest.request; IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId); - Engine.IndexingOperation[] ops = null; - Set> mappingsToUpdate = null; BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; - long[] versions = new long[request.items().length]; + long[] preVersions = new long[request.items().length]; for (int i = 0; i < request.items().length; i++) { BulkItemRequest item = request.items()[i]; if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { - - // validate, if routing is required, that we got routing - MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type()); - if (mappingMd != null && mappingMd.routing().required()) { - if (indexRequest.routing() == null) { - throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id()); - } - } - - SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) - .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); - - long version; - Engine.IndexingOperation op; - if (indexRequest.opType() == IndexRequest.OpType.INDEX) { - Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); - indexShard.index(index); - version = index.version(); - op = index; - } else { - Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); - indexShard.create(create); - version = create.version(); - op = create; - } - versions[i] = indexRequest.version(); - // update the version on request so it will happen on the replicas - indexRequest.version(version); - - // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added - if (op.parsedDoc().mappingsModified()) { + WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true); + // add the response + IndexResponse indexResponse = result.response(); + responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse); + preVersions[i] = result.preVersion; + if (result.mappingToUpdate != null) { if (mappingsToUpdate == null) { mappingsToUpdate = Sets.newHashSet(); } - mappingsToUpdate.add(Tuple.tuple(indexRequest.index(), indexRequest.type())); + mappingsToUpdate.add(result.mappingToUpdate); } - - // if we are going to percolate, then we need to keep this op for the postPrimary operation - if (Strings.hasLength(indexRequest.percolate())) { + if (result.op != null) { if (ops == null) { ops = new Engine.IndexingOperation[request.items().length]; } - ops[i] = op; + ops[i] = result.op; } - - // add the response - responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), - new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version)); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { // restore updated versions... for (int j = 0; j < i; j++) { - applyVersion(request.items()[j], versions[j]); + applyVersion(request.items()[j], preVersions[j]); } throw (ElasticSearchException) e; } @@ -209,20 +191,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } else if (item.request() instanceof DeleteRequest) { DeleteRequest deleteRequest = (DeleteRequest) item.request(); try { - Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); - indexShard.delete(delete); - // update the request with teh version so it will go to the replicas - deleteRequest.version(delete.version()); - // add the response - responses[i] = new BulkItemResponse(item.id(), "delete", - new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound())); + DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response(); + responses[i] = new BulkItemResponse(item.id(), "delete", deleteResponse); } catch (Exception e) { // rethrow the failure if we are going to retry on primary and let parent failure to handle it if (retryPrimaryException(e)) { // restore updated versions... for (int j = 0; j < i; j++) { - applyVersion(request.items()[j], versions[j]); + applyVersion(request.items()[j], preVersions[j]); } throw (ElasticSearchException) e; } @@ -236,6 +213,109 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation // nullify the request so it won't execute on the replicas request.items()[i] = null; } + } else if (item.request() instanceof UpdateRequest) { + UpdateRequest updateRequest = (UpdateRequest) item.request(); + int retryCount = 0; + do { + UpdateResult updateResult; + try { + updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard); + } catch (Throwable t) { + updateResult = new UpdateResult(null, null, false, t, null); + } + if (updateResult.success()) { + switch (updateResult.result.operation()) { + case UPSERT: + case INDEX: + WriteResult result = updateResult.writeResult; + IndexRequest indexRequest = updateResult.request(); + BytesReference indexSourceAsBytes = indexRequest.source(); + // add the response + IndexResponse indexResponse = result.response(); + UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion()); + updateResponse.setMatches(indexResponse.getMatches()); + if (updateRequest.fields() != null && updateRequest.fields().length > 0) { + Tuple> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true); + updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes)); + } + responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); + preVersions[i] = result.preVersion; + if (result.mappingToUpdate != null) { + if (mappingsToUpdate == null) { + mappingsToUpdate = Sets.newHashSet(); + } + mappingsToUpdate.add(result.mappingToUpdate); + } + if (result.op != null) { + if (ops == null) { + ops = new Engine.IndexingOperation[request.items().length]; + } + ops[i] = result.op; + } + // Replace the update request to the translated index request to execute on the replica. + request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest); + break; + case DELETE: + DeleteResponse response = updateResult.writeResult.response(); + DeleteRequest deleteRequest = updateResult.request(); + updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null)); + responses[i] = new BulkItemResponse(item.id(), "update", updateResponse); + // Replace the update request to the translated delete request to execute on the replica. + request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest); + break; + case NONE: + responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult); + request.items()[i] = null; // No need to go to the replica + break; + } + // NOTE: Breaking out of the retry_on_conflict loop! + break; + } else if (updateResult.failure()) { + Throwable t = updateResult.error; + if (!updateResult.retry) { + // rethrow the failure if we are going to retry on primary and let parent failure to handle it + if (retryPrimaryException(t)) { + // restore updated versions... + for (int j = 0; j < i; j++) { + applyVersion(request.items()[j], preVersions[j]); + } + throw (ElasticSearchException) t; + } + if (updateResult.result == null) { + responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t))); + } else { + switch (updateResult.result.operation()) { + case UPSERT: + case INDEX: + IndexRequest indexRequest = updateResult.request(); + if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { + logger.trace("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); + } else { + logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest); + } + responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), + new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t))); + break; + case DELETE: + DeleteRequest deleteRequest = updateResult.request(); + if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) { + logger.trace("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); + } else { + logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest); + } + responses[i] = new BulkItemResponse(item.id(), "delete", + new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t))); + break; + } + } + // nullify the request so it won't execute on the replicas + request.items()[i] = null; + // NOTE: Breaking out of the retry_on_conflict loop! + break; + } + } + } while (++retryCount < updateRequest.retryOnConflict()); } } @@ -256,6 +336,177 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation return new PrimaryResponse(shardRequest.request, response, ops); } + static class WriteResult { + + final Object response; + final long preVersion; + final Tuple mappingToUpdate; + final Engine.IndexingOperation op; + + WriteResult(Object response, long preVersion, Tuple mappingToUpdate, Engine.IndexingOperation op) { + this.response = response; + this.preVersion = preVersion; + this.mappingToUpdate = mappingToUpdate; + this.op = op; + } + + @SuppressWarnings("unchecked") + T response() { + return (T) response; + } + + } + + private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState, + IndexShard indexShard, boolean processed) { + + // validate, if routing is required, that we got routing + MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type()); + if (mappingMd != null && mappingMd.routing().required()) { + if (indexRequest.routing() == null) { + throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id()); + } + } + + if (!processed) { + indexRequest.process(clusterState.metaData(), indexRequest.index(), mappingMd, allowIdGeneration); + } + + SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) + .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl()); + + long version; + Engine.IndexingOperation op; + if (indexRequest.opType() == IndexRequest.OpType.INDEX) { + Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + indexShard.index(index); + version = index.version(); + op = index; + } else { + Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + indexShard.create(create); + version = create.version(); + op = create; + } + long preVersion = indexRequest.version(); + // update the version on request so it will happen on the replicas + indexRequest.version(version); + + // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added + Tuple mappingsToUpdate = null; + if (op.parsedDoc().mappingsModified()) { + mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type()); + } + + // if we are going to percolate, then we need to keep this op for the postPrimary operation + if (!Strings.hasLength(indexRequest.percolate())) { + op = null; + } + + IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version); + return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op); + } + + private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) { + Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY); + indexShard.delete(delete); + // update the request with the version so it will go to the replicas + deleteRequest.version(delete.version()); + DeleteResponse deleteResponse = new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()); + return new WriteResult(deleteResponse, deleteRequest.version(), null, null); + } + + static class UpdateResult { + + final UpdateHelper.Result result; + final ActionRequest actionRequest; + final boolean retry; + final Throwable error; + final WriteResult writeResult; + final UpdateResponse noopResult; + + UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) { + this.result = result; + this.actionRequest = actionRequest; + this.retry = retry; + this.error = error; + this.writeResult = writeResult; + this.noopResult = null; + } + + UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, WriteResult writeResult) { + this.result = result; + this.actionRequest = actionRequest; + this.writeResult = writeResult; + this.retry = false; + this.error = null; + this.noopResult = null; + } + + public UpdateResult(UpdateHelper.Result result, UpdateResponse updateResponse) { + this.result = result; + this.noopResult = updateResponse; + this.actionRequest = null; + this.writeResult = null; + this.retry = false; + this.error = null; + } + + + boolean failure() { + return error != null; + } + + boolean success() { + return noopResult != null || writeResult != null; + } + + @SuppressWarnings("unchecked") + T request() { + return (T) actionRequest; + } + + + } + + private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) { + UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard); + switch (translate.operation()) { + case UPSERT: + case INDEX: + IndexRequest indexRequest = translate.action(); + try { + WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false); + return new UpdateResult(translate, indexRequest, result); + } catch (Throwable t) { + t = ExceptionsHelper.unwrapCause(t); + boolean retry = false; + if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) { + retry = true; + } + return new UpdateResult(translate, indexRequest, retry, t, null); + } + case DELETE: + DeleteRequest deleteRequest = translate.action(); + try { + WriteResult result = shardDeleteOperation(deleteRequest, indexShard); + return new UpdateResult(translate, deleteRequest, result); + } catch (Throwable t) { + t = ExceptionsHelper.unwrapCause(t); + boolean retry = false; + if (t instanceof VersionConflictEngineException) { + retry = true; + } + return new UpdateResult(translate, deleteRequest, retry, t, null); + } + case NONE: + UpdateResponse updateResponse = translate.action(); + return new UpdateResult(translate, updateResponse); + default: + throw new ElasticSearchIllegalStateException("Illegal update operation " + translate.operation()); + } + } + @Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse response) { IndexService indexService = indicesService.indexServiceSafe(request.index()); diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 99e9b23868f..b1f8dd245a3 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -21,7 +21,7 @@ package org.elasticsearch.action.update; import com.google.common.collect.ImmutableList; import org.elasticsearch.ElasticSearchException; -import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -35,7 +35,6 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.TransportIndexAction; import org.elasticsearch.action.support.AutoCreateIndex; import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction; -import org.elasticsearch.client.Requests; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; @@ -44,67 +43,40 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.PlainShardIterator; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.engine.DocumentAlreadyExistsException; -import org.elasticsearch.index.engine.DocumentMissingException; -import org.elasticsearch.index.engine.DocumentSourceMissingException; import org.elasticsearch.index.engine.VersionConflictEngineException; -import org.elasticsearch.index.get.GetField; -import org.elasticsearch.index.get.GetResult; -import org.elasticsearch.index.mapper.internal.ParentFieldMapper; -import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; -import org.elasticsearch.index.mapper.internal.SourceFieldMapper; -import org.elasticsearch.index.mapper.internal.TTLFieldMapper; -import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.IllegalIndexShardStateException; -import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndexAlreadyExistsException; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.ScriptService; -import org.elasticsearch.search.lookup.SourceLookup; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import java.util.ArrayList; -import java.util.HashMap; import java.util.Map; -import static com.google.common.collect.Maps.newHashMapWithExpectedSize; - /** */ public class TransportUpdateAction extends TransportInstanceSingleOperationAction { - private final IndicesService indicesService; - private final TransportDeleteAction deleteAction; - private final TransportIndexAction indexAction; - - private final ScriptService scriptService; - private final AutoCreateIndex autoCreateIndex; - private final TransportCreateIndexAction createIndexAction; + private final UpdateHelper updateHelper; @Inject public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, - IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService, TransportCreateIndexAction createIndexAction) { + TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction, + UpdateHelper updateHelper) { super(settings, threadPool, clusterService, transportService); - this.indicesService = indicesService; this.indexAction = indexAction; this.deleteAction = deleteAction; - this.scriptService = scriptService; this.createIndexAction = createIndexAction; + this.updateHelper = updateHelper; this.autoCreateIndex = new AutoCreateIndex(settings); } @@ -212,240 +184,109 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio } protected void shardOperation(final UpdateRequest request, final ActionListener listener, final int retryCount) throws ElasticSearchException { - IndexService indexService = indicesService.indexServiceSafe(request.index()); - IndexShard indexShard = indexService.shardSafe(request.shardId()); - - long getDate = System.currentTimeMillis(); - final GetResult getResult = indexShard.getService().get(request.type(), request.id(), - new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true); - - // no doc, what to do, what to do... - if (!getResult.isExists()) { - if (request.upsertRequest() == null) { - listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); - return; - } - final IndexRequest indexRequest = request.upsertRequest(); - indexRequest.index(request.index()).type(request.type()).id(request.id()) - // it has to be a "create!" - .create(true) - .routing(request.routing()) - .percolate(request.percolate()) - .refresh(request.refresh()) - .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); - indexRequest.operationThreaded(false); - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request - final BytesReference updateSourceBytes = indexRequest.source(); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); - update.setMatches(response.getMatches()); - if (request.fields() != null && request.fields().length > 0) { - Tuple> sourceAndContent = XContentHelper.convertToMap(updateSourceBytes, true); - update.setGetResult(extractGetResult(request, response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), updateSourceBytes)); - } else { - update.setGetResult(null); - } - listener.onResponse(update); - } - - @Override - public void onFailure(Throwable e) { - e = ExceptionsHelper.unwrapCause(e); - if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new Runnable() { - @Override - public void run() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; + final UpdateHelper.Result result = updateHelper.prepare(request); + switch (result.operation()) { + case UPSERT: + IndexRequest upsertRequest = result.action(); + // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request + final BytesReference upsertSourceBytes = upsertRequest.source(); + indexAction.execute(upsertRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + update.setMatches(response.getMatches()); + if (request.fields() != null && request.fields().length > 0) { + Tuple> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true); + update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes)); + } else { + update.setGetResult(null); } + listener.onResponse(update); } - listener.onFailure(e); - } - }); - return; - } - if (getResult.internalSourceRef() == null) { - // no source, we can't do nothing, through a failure... - listener.onFailure(new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); - return; - } - - Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true); - String operation = null; - String timestamp = null; - Long ttl = null; - Object fetchedTTL = null; - final Map updatedSourceAsMap; - final XContentType updateSourceContentType = sourceAndContent.v1(); - String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null; - String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null; - - if (request.script() == null && request.doc() != null) { - IndexRequest indexRequest = request.doc(); - updatedSourceAsMap = sourceAndContent.v2(); - if (indexRequest.ttl() > 0) { - ttl = indexRequest.ttl(); - } - timestamp = indexRequest.timestamp(); - if (indexRequest.routing() != null) { - routing = indexRequest.routing(); - } - if (indexRequest.parent() != null) { - parent = indexRequest.parent(); - } - XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap()); - } else { - Map ctx = new HashMap(2); - ctx.put("_source", sourceAndContent.v2()); - - try { - ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams); - script.setNextVar("ctx", ctx); - script.run(); - // we need to unwrap the ctx... - ctx = (Map) script.unwrap(ctx); - } catch (Exception e) { - throw new ElasticSearchIllegalArgumentException("failed to execute script", e); - } - - operation = (String) ctx.get("op"); - timestamp = (String) ctx.get("_timestamp"); - fetchedTTL = ctx.get("_ttl"); - if (fetchedTTL != null) { - if (fetchedTTL instanceof Number) { - ttl = ((Number) fetchedTTL).longValue(); - } else { - ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); - } - } - - updatedSourceAsMap = (Map) ctx.get("_source"); - } - - // apply script to update the source - // No TTL has been given in the update script so we keep previous TTL value if there is one - if (ttl == null) { - ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null; - if (ttl != null) { - ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved - } - } - - // TODO: external version type, does it make sense here? does not seem like it... - - if (operation == null || "index".equals(operation)) { - final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) - .source(updatedSourceAsMap, updateSourceContentType) - .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) - .timestamp(timestamp).ttl(ttl) - .percolate(request.percolate()) - .refresh(request.refresh()); - indexRequest.operationThreaded(false); - // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request - final BytesReference updateSourceBytes = indexRequest.source(); - indexAction.execute(indexRequest, new ActionListener() { - @Override - public void onResponse(IndexResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); - update.setMatches(response.getMatches()); - update.setGetResult(extractGetResult(request, response.getVersion(), updatedSourceAsMap, updateSourceContentType, updateSourceBytes)); - listener.onResponse(update); - } - - @Override - public void onFailure(Throwable e) { - e = ExceptionsHelper.unwrapCause(e); - if (e instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new Runnable() { - @Override - public void run() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) { + if (retryCount < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } } + listener.onFailure(e); + } + }); + break; + case INDEX: + IndexRequest indexRequest = result.action(); + // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request + final BytesReference indexSourceBytes = indexRequest.source(); + indexAction.execute(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + update.setMatches(response.getMatches()); + update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes)); + listener.onResponse(update); } - listener.onFailure(e); - } - }); - } else if ("delete".equals(operation)) { - DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) - .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); - deleteRequest.operationThreaded(false); - deleteAction.execute(deleteRequest, new ActionListener() { - @Override - public void onResponse(DeleteResponse response) { - UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); - update.setGetResult(extractGetResult(request, response.getVersion(), updatedSourceAsMap, updateSourceContentType, null)); - listener.onResponse(update); - } - @Override - public void onFailure(Throwable e) { - e = ExceptionsHelper.unwrapCause(e); - if (e instanceof VersionConflictEngineException) { - if (retryCount < request.retryOnConflict()) { - threadPool.executor(executor()).execute(new Runnable() { - @Override - public void run() { - shardOperation(request, listener, retryCount + 1); - } - }); - return; + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException) { + if (retryCount < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } } + listener.onFailure(e); } - listener.onFailure(e); - } - }); - } else if ("none".equals(operation)) { - UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); - update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null)); - listener.onResponse(update); - } else { - logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); - listener.onResponse(new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion())); - } - } - - @Nullable - protected GetResult extractGetResult(final UpdateRequest request, long version, final Map source, XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) { - if (request.fields() == null || request.fields().length == 0) { - return null; - } - boolean sourceRequested = false; - Map fields = null; - if (request.fields() != null && request.fields().length > 0) { - SourceLookup sourceLookup = new SourceLookup(); - sourceLookup.setNextSource(source); - for (String field : request.fields()) { - if (field.equals("_source")) { - sourceRequested = true; - continue; - } - Object value = sourceLookup.extractValue(field); - if (value != null) { - if (fields == null) { - fields = newHashMapWithExpectedSize(2); + }); + break; + case DELETE: + DeleteRequest deleteRequest = result.action(); + deleteAction.execute(deleteRequest, new ActionListener() { + @Override + public void onResponse(DeleteResponse response) { + UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion()); + update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null)); + listener.onResponse(update); } - GetField getField = fields.get(field); - if (getField == null) { - getField = new GetField(field, new ArrayList(2)); - fields.put(field, getField); + + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException) { + if (retryCount < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } + } + listener.onFailure(e); } - getField.getValues().add(value); - } - } + }); + break; + case NONE: + UpdateResponse update = result.action(); + listener.onResponse(update); + break; + default: + throw new ElasticSearchIllegalStateException("Illegal operation " + result.operation()); } - - // TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType) - - return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields); } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateHelper.java b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java new file mode 100644 index 00000000000..5c6936e8cb4 --- /dev/null +++ b/src/main/java/org/elasticsearch/action/update/UpdateHelper.java @@ -0,0 +1,254 @@ +package org.elasticsearch.action.update; + +import org.elasticsearch.ElasticSearchIllegalArgumentException; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.engine.DocumentMissingException; +import org.elasticsearch.index.engine.DocumentSourceMissingException; +import org.elasticsearch.index.get.GetField; +import org.elasticsearch.index.get.GetResult; +import org.elasticsearch.index.mapper.internal.ParentFieldMapper; +import org.elasticsearch.index.mapper.internal.RoutingFieldMapper; +import org.elasticsearch.index.mapper.internal.SourceFieldMapper; +import org.elasticsearch.index.mapper.internal.TTLFieldMapper; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.service.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.lookup.SourceLookup; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static com.google.common.collect.Maps.newHashMapWithExpectedSize; + +/** + * Helper for translating an update request to an index, delete request or update response. + */ +public class UpdateHelper extends AbstractComponent { + + private final IndicesService indicesService; + private final ScriptService scriptService; + + @Inject + public UpdateHelper(Settings settings, IndicesService indicesService, ScriptService scriptService) { + super(settings); + this.indicesService = indicesService; + this.scriptService = scriptService; + } + + /** + * Prepares an update request by converting it into an index or delete request or an update response (no action). + */ + public Result prepare(UpdateRequest request) { + IndexService indexService = indicesService.indexServiceSafe(request.index()); + IndexShard indexShard = indexService.shardSafe(request.shardId()); + return prepare(request, indexShard); + } + + public Result prepare(UpdateRequest request, IndexShard indexShard) { + long getDate = System.currentTimeMillis(); + final GetResult getResult = indexShard.getService().get(request.type(), request.id(), + new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true); + + if (!getResult.isExists()) { + if (request.upsertRequest() == null) { + throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + } + IndexRequest indexRequest = request.upsertRequest(); + indexRequest.index(request.index()).type(request.type()).id(request.id()) + // it has to be a "create!" + .create(true) + .routing(request.routing()) + .percolate(request.percolate()) + .refresh(request.refresh()) + .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + indexRequest.operationThreaded(false); + return new Result(indexRequest, Operation.UPSERT, null, null); + } + + if (getResult.internalSourceRef() == null) { + // no source, we can't do nothing, through a failure... + throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()); + } + + Tuple> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true); + String operation = null; + String timestamp = null; + Long ttl = null; + Object fetchedTTL = null; + final Map updatedSourceAsMap; + final XContentType updateSourceContentType = sourceAndContent.v1(); + String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null; + String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null; + + if (request.script() == null && request.doc() != null) { + IndexRequest indexRequest = request.doc(); + updatedSourceAsMap = sourceAndContent.v2(); + if (indexRequest.ttl() > 0) { + ttl = indexRequest.ttl(); + } + timestamp = indexRequest.timestamp(); + if (indexRequest.routing() != null) { + routing = indexRequest.routing(); + } + if (indexRequest.parent() != null) { + parent = indexRequest.parent(); + } + XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap()); + } else { + Map ctx = new HashMap(2); + ctx.put("_source", sourceAndContent.v2()); + + try { + ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams); + script.setNextVar("ctx", ctx); + script.run(); + // we need to unwrap the ctx... + ctx = (Map) script.unwrap(ctx); + } catch (Exception e) { + throw new ElasticSearchIllegalArgumentException("failed to execute script", e); + } + + operation = (String) ctx.get("op"); + timestamp = (String) ctx.get("_timestamp"); + + fetchedTTL = ctx.get("_ttl"); + if (fetchedTTL != null) { + if (fetchedTTL instanceof Number) { + ttl = ((Number) fetchedTTL).longValue(); + } else { + ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis(); + } + } + + updatedSourceAsMap = (Map) ctx.get("_source"); + } + + // apply script to update the source + // No TTL has been given in the update script so we keep previous TTL value if there is one + if (ttl == null) { + ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null; + if (ttl != null) { + ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved + } + } + + // TODO: external version type, does it make sense here? does not seem like it... + if (operation == null || "index".equals(operation)) { + final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) + .source(updatedSourceAsMap, updateSourceContentType) + .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()) + .timestamp(timestamp).ttl(ttl) + .percolate(request.percolate()) + .refresh(request.refresh()); + indexRequest.operationThreaded(false); + return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType); + } else if ("delete".equals(operation)) { + DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent) + .version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + deleteRequest.operationThreaded(false); + return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType); + } else if ("none".equals(operation)) { + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); + update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null)); + return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); + } else { + logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script); + UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()); + return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType); + } + } + + /** + * Extracts the fields from the updated document to be returned in a update response + */ + public GetResult extractGetResult(final UpdateRequest request, long version, final Map source, XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) { + if (request.fields() == null || request.fields().length == 0) { + return null; + } + boolean sourceRequested = false; + Map fields = null; + if (request.fields() != null && request.fields().length > 0) { + SourceLookup sourceLookup = new SourceLookup(); + sourceLookup.setNextSource(source); + for (String field : request.fields()) { + if (field.equals("_source")) { + sourceRequested = true; + continue; + } + Object value = sourceLookup.extractValue(field); + if (value != null) { + if (fields == null) { + fields = newHashMapWithExpectedSize(2); + } + GetField getField = fields.get(field); + if (getField == null) { + getField = new GetField(field, new ArrayList(2)); + fields.put(field, getField); + } + getField.getValues().add(value); + } + } + } + + // TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType) + return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields); + } + + public static class Result { + + private final Streamable action; + private final Operation operation; + private final Map updatedSourceAsMap; + private final XContentType updateSourceContentType; + + public Result(Streamable action, Operation operation, Map updatedSourceAsMap, XContentType updateSourceContentType) { + this.action = action; + this.operation = operation; + this.updatedSourceAsMap = updatedSourceAsMap; + this.updateSourceContentType = updateSourceContentType; + } + + @SuppressWarnings("unchecked") + public T action() { + return (T) action; + } + + public Operation operation() { + return operation; + } + + public Map updatedSourceAsMap() { + return updatedSourceAsMap; + } + + public XContentType updateSourceContentType() { + return updateSourceContentType; + } + } + + public static enum Operation { + + UPSERT, + INDEX, + DELETE, + NONE + + } + +} diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index a98455f5b56..f6b88f21c91 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -72,7 +72,7 @@ public class UpdateRequest extends InstanceShardOperationRequest @Nullable private IndexRequest doc; - UpdateRequest() { + public UpdateRequest() { } @@ -188,6 +188,10 @@ public class UpdateRequest extends InstanceShardOperationRequest return this; } + public String scriptLang() { + return scriptLang; + } + /** * Add a script parameter. */ diff --git a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java index 1e9d589d6db..8b5dc99723b 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateResponse.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateResponse.java @@ -86,7 +86,7 @@ public class UpdateResponse extends ActionResponse { return this.matches; } - void setGetResult(GetResult getResult) { + public void setGetResult(GetResult getResult) { this.getResult = getResult; } diff --git a/src/main/java/org/elasticsearch/indices/IndicesModule.java b/src/main/java/org/elasticsearch/indices/IndicesModule.java index 296208d486c..c7714df2f3e 100644 --- a/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -20,6 +20,7 @@ package org.elasticsearch.indices; import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.inject.Module; import org.elasticsearch.common.inject.SpawnModules; @@ -75,5 +76,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules { bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton(); + bind(UpdateHelper.class).asEagerSingleton(); } } diff --git a/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java new file mode 100644 index 00000000000..9f9db4e3013 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/integration/document/BulkTests.java @@ -0,0 +1,310 @@ +package org.elasticsearch.test.integration.document; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.update.UpdateRequestBuilder; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.*; + +/** + */ +public class BulkTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass + public void createNodes() throws Exception { + startNode("node1"); + startNode("node2"); + client = getClient(); + } + + @AfterClass + public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test + public void testBulkUpdate_simple() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + ).execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + + BulkResponse bulkResponse = client.prepareBulk() + .add(client.prepareIndex().setIndex("test").setType("type1").setId("1").setSource("field", 1)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("2").setSource("field", 2).setCreate(true)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("3").setSource("field", 3)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("4").setSource("field", 4)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("5").setSource("field", 5)) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(5)); + + bulkResponse = client.prepareBulk() + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("1").setScript("ctx._source.field += 1")) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1").setRetryOnConflict(3)) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("3").setDoc(jsonBuilder().startObject().field("field1", "test").endObject())) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(3)); + assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getId(), equalTo("1")); + assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(2l)); + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2")); + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l)); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getId(), equalTo("3")); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(2l)); + + GetResponse getResponse = client.prepareGet().setIndex("test").setType("type1").setId("1").setFields("field").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(2l)); + assertThat(((Long) getResponse.getField("field").getValue()), equalTo(2l)); + + getResponse = client.prepareGet().setIndex("test").setType("type1").setId("2").setFields("field").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(2l)); + assertThat(((Long) getResponse.getField("field").getValue()), equalTo(3l)); + + getResponse = client.prepareGet().setIndex("test").setType("type1").setId("3").setFields("field1").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(2l)); + assertThat(getResponse.getField("field1").getValue().toString(), equalTo("test")); + + bulkResponse = client.prepareBulk() + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("6").setScript("ctx._source.field += 1") + .setUpsertRequest(jsonBuilder().startObject().field("field", 0).endObject())) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("7").setScript("ctx._source.field += 1")) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1")) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(true)); + assertThat(bulkResponse.getItems().length, equalTo(3)); + assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getId(), equalTo("6")); + assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(1l)); + assertThat(bulkResponse.getItems()[1].getResponse(), nullValue()); + assertThat(bulkResponse.getItems()[1].getFailure().getId(), equalTo("7")); + assertThat(bulkResponse.getItems()[1].getFailure().getMessage(), containsString("DocumentMissingException")); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getId(), equalTo("2")); + assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l)); + + getResponse = client.prepareGet().setIndex("test").setType("type1").setId("6").setFields("field").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(1l)); + assertThat(((Long) getResponse.getField("field").getValue()), equalTo(0l)); + + getResponse = client.prepareGet().setIndex("test").setType("type1").setId("7").setFields("field").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(false)); + + getResponse = client.prepareGet().setIndex("test").setType("type1").setId("2").setFields("field").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(3l)); + assertThat(((Long) getResponse.getField("field").getValue()), equalTo(4l)); + } + + @Test + public void testBulkUpdate_malformedScripts() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 0) + ).execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + + BulkResponse bulkResponse = client.prepareBulk() + .add(client.prepareIndex().setIndex("test").setType("type1").setId("1").setSource("field", 1)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("2").setSource("field", 1)) + .add(client.prepareIndex().setIndex("test").setType("type1").setId("3").setSource("field", 1)) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(false)); + assertThat(bulkResponse.getItems().length, equalTo(3)); + + bulkResponse = client.prepareBulk() + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("1").setScript("ctx._source.field += a").setFields("field")) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1").setFields("field")) + .add(client.prepareUpdate().setIndex("test").setType("type1").setId("3").setScript("ctx._source.field += a").setFields("field")) + .execute().actionGet(); + + assertThat(bulkResponse.hasFailures(), equalTo(true)); + assertThat(bulkResponse.getItems().length, equalTo(3)); + assertThat(bulkResponse.getItems()[0].getFailure().getId(), equalTo("1")); + assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("failed to execute script")); + assertThat(bulkResponse.getItems()[0].getResponse(), nullValue()); + + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2")); + assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l)); + assertThat(((Integer)((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2)); + assertThat(bulkResponse.getItems()[1].getFailure(), nullValue()); + + assertThat(bulkResponse.getItems()[2].getFailure().getId(), equalTo("3")); + assertThat(bulkResponse.getItems()[2].getFailure().getMessage(), containsString("failed to execute script")); + assertThat(bulkResponse.getItems()[2].getResponse(), nullValue()); + } + + @Test + public void testBulkUpdate_largerVolume() throws Exception { + client.admin().indices().prepareDelete().execute().actionGet(); + + client.admin().indices().prepareCreate("test") + .setSettings( + ImmutableSettings.settingsBuilder() + .put("index.number_of_shards", 2) + .put("index.number_of_replicas", 1) + ).execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + + int numDocs = 2000; + BulkRequestBuilder builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)) + .setScript("ctx._source.counter += 1").setFields("counter") + .setUpsertRequest(jsonBuilder().startObject().field("counter", 1).endObject()) + ); + } + + BulkResponse response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getVersion(), equalTo(1l)); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); + assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(1l)); + assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1)); + + for (int j = 0; j < 5; j++) { + GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(true)); + assertThat(getResponse.getVersion(), equalTo(1l)); + assertThat((Long) getResponse.getField("counter").getValue(), equalTo(1l)); + } + } + + builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + UpdateRequestBuilder updateBuilder = client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setFields("counter"); + if (i % 2 == 0) { + updateBuilder.setScript("ctx._source.counter += 1"); + } else { + updateBuilder.setDoc(jsonBuilder().startObject().field("counter", 2).endObject()); + } + if (i % 3 == 0) { + updateBuilder.setRetryOnConflict(3); + } + + builder.add(updateBuilder); + } + + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getVersion(), equalTo(2l)); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i))); + assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(2l)); + assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2)); + } + + builder = client.prepareBulk(); + int maxDocs = numDocs / 2 + numDocs; + for (int i = (numDocs / 2); i < maxDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx._source.counter += 1") + ); + } + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(true)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + int id = i + (numDocs / 2); + if (i >= (numDocs / 2)) { + assertThat(response.getItems()[i].getFailure().getId(), equalTo(Integer.toString(id))); + assertThat(response.getItems()[i].getFailure().getMessage(), containsString("DocumentMissingException")); + } else { + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(id))); + assertThat(response.getItems()[i].getVersion(), equalTo(3l)); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + } + } + + builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"none\"") + ); + } + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getItemId(), equalTo(i)); + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + } + + builder = client.prepareBulk(); + for (int i = 0; i < numDocs; i++) { + builder.add( + client.prepareUpdate() + .setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"delete\"") + ); + } + response = builder.execute().actionGet(); + assertThat(response.hasFailures(), equalTo(false)); + assertThat(response.getItems().length, equalTo(numDocs)); + for (int i = 0; i < numDocs; i++) { + assertThat(response.getItems()[i].getItemId(), equalTo(i)); + assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i))); + assertThat(response.getItems()[i].getIndex(), equalTo("test")); + assertThat(response.getItems()[i].getType(), equalTo("type1")); + assertThat(response.getItems()[i].getOpType(), equalTo("update")); + for (int j = 0; j < 5; j++) { + GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet(); + assertThat(getResponse.isExists(), equalTo(false)); + } + } + } + +} diff --git a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java index 3acc0eb6795..0442e50e93a 100644 --- a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java +++ b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateRequestBuilder; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.common.Priority; @@ -38,6 +39,7 @@ import org.testng.annotations.Test; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.termQuery; @@ -45,6 +47,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.*; public class UpdateTests extends AbstractNodesTests { + private Client client; @BeforeClass @@ -378,4 +381,64 @@ public class UpdateTests extends AbstractNodesTests { assertThat(map2.containsKey("commonkey"), equalTo(true)); } } + + @Test + public void testConcurrentUpdateWithRetryOnConflict() throws Exception { + concurrentUpdateWithRetryOnConflict(false); + } + + @Test + public void testConcurrentUpdateWithRetryOnConflict_bulk() throws Exception { + concurrentUpdateWithRetryOnConflict(true); + } + + private void concurrentUpdateWithRetryOnConflict(final boolean useBulkApi) throws Exception { + createIndex(); + ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.isTimedOut(), equalTo(false)); + assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN)); + + int numberOfThreads = 5; + final CountDownLatch latch = new CountDownLatch(numberOfThreads); + final int numberOfUpdatesPerThread = 10000; + for (int i = 0; i < numberOfThreads; i++) { + Runnable r = new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < numberOfUpdatesPerThread; i++) { + if (useBulkApi) { + UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("test", "type1", Integer.toString(i)) + .setScript("ctx._source.field += 1") + .setRetryOnConflict(Integer.MAX_VALUE) + .setUpsertRequest(jsonBuilder().startObject().field("field", 1).endObject()); + client.prepareBulk().add(updateRequestBuilder).execute().actionGet(); + } else { + client.prepareUpdate("test", "type1", Integer.toString(i)).setScript("ctx._source.field += 1") + .setRetryOnConflict(Integer.MAX_VALUE) + .setUpsertRequest(jsonBuilder().startObject().field("field", 1).endObject()) + .execute().actionGet(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + latch.countDown(); + } + } + + }; + new Thread(r).start(); + } + latch.await(); + + for (int i = 0; i < numberOfUpdatesPerThread; i++) { + GetResponse response = client.prepareGet("test", "type1", Integer.toString(i)).execute().actionGet(); + assertThat(response.getId(), equalTo(Integer.toString(i))); + assertThat(response.getVersion(), equalTo((long) numberOfThreads)); + assertThat((Integer) response.getSource().get("field"), equalTo(numberOfThreads)); + } + } + } diff --git a/src/test/java/org/elasticsearch/test/unit/action/bulk/BulkRequestTests.java b/src/test/java/org/elasticsearch/test/unit/action/bulk/BulkRequestTests.java index 1353bd8240c..e3ddb36adc7 100644 --- a/src/test/java/org/elasticsearch/test/unit/action/bulk/BulkRequestTests.java +++ b/src/test/java/org/elasticsearch/test/unit/action/bulk/BulkRequestTests.java @@ -22,6 +22,7 @@ package org.elasticsearch.test.unit.action.bulk; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.bytes.BytesArray; import org.testng.annotations.Test; @@ -58,4 +59,24 @@ public class BulkRequestTests { bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null); assertThat(bulkRequest.numberOfActions(), equalTo(3)); } + + @Test + public void testSimpleBulk4() throws Exception { + String bulkAction = copyToStringFromClasspath("/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json"); + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null); + assertThat(bulkRequest.numberOfActions(), equalTo(4)); + assertThat(((UpdateRequest) bulkRequest.requests().get(0)).id(), equalTo("1")); + assertThat(((UpdateRequest) bulkRequest.requests().get(0)).retryOnConflict(), equalTo(2)); + assertThat(((UpdateRequest) bulkRequest.requests().get(0)).doc().source().toUtf8(), equalTo("{\"field\":\"value\"}")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).id(), equalTo("0")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).type(), equalTo("type1")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).index(), equalTo("index1")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).script(), equalTo("counter += param1")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).scriptLang(), equalTo("js")); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).scriptParams().size(), equalTo(1)); + assertThat(((Integer) ((UpdateRequest) bulkRequest.requests().get(1)).scriptParams().get("param1")), equalTo(1)); + assertThat(((UpdateRequest) bulkRequest.requests().get(1)).upsertRequest().source().toUtf8(), equalTo("{\"counter\":1}")); + } + } diff --git a/src/test/java/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json b/src/test/java/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json new file mode 100644 index 00000000000..8b916b8feed --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json @@ -0,0 +1,7 @@ +{ "update" : {"_id" : "1", "_retry_on_conflict" : 2} } +{ "doc" : {"field" : "value"} } +{ "update" : { "_id" : "0", "_type" : "type1", "_index" : "index1" } } +{ "script" : "counter += param1", "lang" : "js", "params" : {"param1" : 1}, "upsert" : {"counter" : 1}} +{ "delete" : { "_id" : "2" } } +{ "create" : { "_id" : "3" } } +{ "field1" : "value3" }