Added support for the update operation in the bulk api.

Update requests can now be put in the bulk api. All update request options are supported.

Example usage:
```
curl -XPOST 'localhost:9200/_bulk' --date-binary @bulk.json
```

Contents of bulk.json that contains two update request items:
```
{ "update" : {"_id" : "1", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} }
{ "script" : "counter += param1", "lang" : "js", "params" : {"param1" : 1}, "upsert" : {"counter" : 1}}
```
The `doc`, `upsert` and all script related options are part of the payload. The `retry_on_conflict` option is part of the header.

Closes #2982
This commit is contained in:
Martijn van Groningen 2013-05-02 00:12:53 +02:00
parent c5e177dc56
commit 9ddd675a02
16 changed files with 1181 additions and 313 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -68,6 +69,8 @@ public class BulkItemRequest implements Streamable {
request = new IndexRequest();
} else if (type == 1) {
request = new DeleteRequest();
} else if (type == 2) {
request = new UpdateRequest();
}
request.readFrom(in);
}
@ -79,6 +82,8 @@ public class BulkItemRequest implements Streamable {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
}
request.writeTo(out);
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -130,6 +131,8 @@ public class BulkItemResponse implements Streamable {
return ((IndexResponse) response).getIndex();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getIndex();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getIndex();
}
return null;
}
@ -146,6 +149,9 @@ public class BulkItemResponse implements Streamable {
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getType();
}
else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getType();
}
return null;
}
@ -160,6 +166,8 @@ public class BulkItemResponse implements Streamable {
return ((IndexResponse) response).getId();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getId();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getId();
}
return null;
}
@ -175,6 +183,8 @@ public class BulkItemResponse implements Streamable {
return ((IndexResponse) response).getVersion();
} else if (response instanceof DeleteResponse) {
return ((DeleteResponse) response).getVersion();
} else if (response instanceof UpdateResponse) {
return ((UpdateResponse) response).getVersion();
}
return -1;
}
@ -229,6 +239,9 @@ public class BulkItemResponse implements Streamable {
} else if (type == 1) {
response = new DeleteResponse();
response.readFrom(in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse();
response.readFrom(in);
}
if (in.readBoolean()) {
@ -247,6 +260,8 @@ public class BulkItemResponse implements Streamable {
out.writeByte((byte) 0);
} else if (response instanceof DeleteResponse) {
out.writeByte((byte) 1);
} else if (response instanceof UpdateResponse) {
out.writeByte((byte) 3); // make 3 instead of 2, because 2 is already in use for 'no responses'
}
response.writeTo(out);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -82,6 +83,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
add((IndexRequest) request, payload);
} else if (request instanceof DeleteRequest) {
add((DeleteRequest) request, payload);
} else if (request instanceof UpdateRequest) {
add((UpdateRequest) request, payload);
} else {
throw new ElasticSearchIllegalArgumentException("No support for request [" + request + "]");
}
@ -125,6 +128,33 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
return this;
}
/**
* Adds an {@link UpdateRequest} to the list of actions to execute.
*/
public BulkRequest add(UpdateRequest request) {
request.beforeLocalFork();
return internalAdd(request, null);
}
public BulkRequest add(UpdateRequest request, @Nullable Object payload) {
request.beforeLocalFork();
return internalAdd(request, payload);
}
BulkRequest internalAdd(UpdateRequest request, @Nullable Object payload) {
requests.add(request);
addPayload(payload);
if (request.doc() != null) {
sizeInBytes += request.doc().source().length();
}
if (request.upsertRequest() != null) {
sizeInBytes += request.upsertRequest().source().length();
}
if (request.script() != null) {
sizeInBytes += request.script().length() * 2;
}
return this;
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
@ -245,6 +275,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
long version = 0;
VersionType versionType = VersionType.INTERNAL;
String percolate = null;
int retryOnConflict = 0;
// at this stage, next token can either be END_OBJECT (and use default index and type, with auto generated id)
// or START_OBJECT which will have another set of parameters
@ -280,6 +311,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
versionType = VersionType.fromString(parser.text());
} else if ("percolate".equals(currentFieldName) || "_percolate".equals(currentFieldName)) {
percolate = parser.textOrNull();
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
}
}
}
@ -310,6 +343,10 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
.create(true)
.source(data.slice(from, nextMarker - from), contentUnsafe)
.percolate(percolate), payload);
} else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.source(data.slice(from, nextMarker - from))
.percolate(percolate), payload);
}
// move pointers
from = nextMarker + 1;
@ -403,6 +440,10 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
DeleteRequest request = new DeleteRequest();
request.readFrom(in);
requests.add(request);
} else if (type == 2) {
UpdateRequest request = new UpdateRequest();
request.readFrom(in);
requests.add(request);
}
}
refresh = in.readBoolean();
@ -419,6 +460,8 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
out.writeByte((byte) 0);
} else if (request instanceof DeleteRequest) {
out.writeByte((byte) 1);
} else if (request instanceof UpdateRequest) {
out.writeByte((byte) 2);
}
request.writeTo(out);
}

View File

@ -27,6 +27,8 @@ import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.internal.InternalClient;
import org.elasticsearch.common.Nullable;
@ -75,6 +77,24 @@ public class BulkRequestBuilder extends ActionRequestBuilder<BulkRequest, BulkRe
return this;
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(UpdateRequest request) {
super.request.add(request);
return this;
}
/**
* Adds an {@link DeleteRequest} to the list of actions to execute.
*/
public BulkRequestBuilder add(UpdateRequestBuilder request) {
super.request.add(request.request());
return this;
}
/**
* Adds a framed data in binary format
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.bulk;
import org.elasticsearch.action.support.replication.ShardReplicationOperationRequest;
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -64,7 +65,11 @@ public class BulkShardRequest extends ShardReplicationOperationRequest<BulkShard
@Override
public void beforeLocalFork() {
for (BulkItemRequest item : items) {
((ShardReplicationOperationRequest) item.request()).beforeLocalFork();
if (item.request() instanceof InstanceShardOperationRequest) {
((InstanceShardOperationRequest) item.request()).beforeLocalFork();
} else {
((ShardReplicationOperationRequest) item.request()).beforeLocalFork();
}
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -99,6 +100,11 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
if (!indices.contains(deleteRequest.index())) {
indices.add(deleteRequest.index());
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
if (!indices.contains(updateRequest.index())) {
indices.add(updateRequest.index());
}
}
}
@ -160,6 +166,10 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
DeleteRequest deleteRequest = (DeleteRequest) request;
deleteRequest.routing(clusterState.metaData().resolveIndexRouting(deleteRequest.routing(), deleteRequest.index()));
deleteRequest.index(clusterState.metaData().concreteIndex(deleteRequest.index()));
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
updateRequest.routing(clusterState.metaData().resolveIndexRouting(updateRequest.routing(), updateRequest.index()));
updateRequest.index(clusterState.metaData().concreteIndex(updateRequest.index()));
}
}
final BulkItemResponse[] responses = new BulkItemResponse[bulkRequest.requests.size()];
@ -201,6 +211,19 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
}
list.add(new BulkItemRequest(i, request));
}
} else if (request instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request;
MappingMetaData mappingMd = clusterState.metaData().index(updateRequest.index()).mappingOrDefault(updateRequest.type());
if (mappingMd != null && mappingMd.routing().required() && updateRequest.routing() == null) {
continue; // What to do?
}
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, updateRequest.index(), updateRequest.type(), updateRequest.id(), updateRequest.routing()).shardId();
List<BulkItemRequest> list = requestsByShard.get(shardId);
if (list == null) {
list = Lists.newArrayList();
requestsByShard.put(shardId, list);
}
list.add(new BulkItemRequest(i, request));
}
}
@ -243,6 +266,10 @@ public class TransportBulkAction extends TransportAction<BulkRequest, BulkRespon
DeleteRequest deleteRequest = (DeleteRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), message));
} else if (request.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) request.request();
responses[request.id()] = new BulkItemResponse(request.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), message));
}
}
}

View File

@ -21,14 +21,19 @@ package org.elasticsearch.action.bulk;
import com.google.common.collect.Sets;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.TransportShardReplicationOperationAction;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
@ -38,10 +43,15 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.SourceToParse;
@ -56,6 +66,7 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
@ -64,13 +75,17 @@ import java.util.Set;
public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final MappingUpdatedAction mappingUpdatedAction;
private final UpdateHelper updateHelper;
private final boolean allowIdGeneration;
@Inject
public TransportShardBulkAction(Settings settings, TransportService transportService, ClusterService clusterService,
IndicesService indicesService, ThreadPool threadPool, ShardStateAction shardStateAction,
MappingUpdatedAction mappingUpdatedAction) {
MappingUpdatedAction mappingUpdatedAction, UpdateHelper updateHelper) {
super(settings, transportService, clusterService, indicesService, threadPool, shardStateAction);
this.mappingUpdatedAction = mappingUpdatedAction;
this.updateHelper = updateHelper;
this.allowIdGeneration = settings.getAsBoolean("action.allow_id_generation", true);
}
@Override
@ -127,72 +142,39 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.IndexingOperation[] ops = null;
Set<Tuple<String, String>> mappingsToUpdate = null;
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
long[] versions = new long[request.items().length];
long[] preVersions = new long[request.items().length];
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (indexRequest.routing() == null) {
throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
}
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version;
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.index(index);
version = index.version();
op = index;
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.create(create);
version = create.version();
op = create;
}
versions[i] = indexRequest.version();
// update the version on request so it will happen on the replicas
indexRequest.version(version);
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
if (op.parsedDoc().mappingsModified()) {
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
// add the response
IndexResponse indexResponse = result.response();
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
preVersions[i] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
}
mappingsToUpdate.add(Tuple.tuple(indexRequest.index(), indexRequest.type()));
mappingsToUpdate.add(result.mappingToUpdate);
}
// if we are going to percolate, then we need to keep this op for the postPrimary operation
if (Strings.hasLength(indexRequest.percolate())) {
if (result.op != null) {
if (ops == null) {
ops = new Engine.IndexingOperation[request.items().length];
}
ops[i] = op;
ops[i] = result.op;
}
// add the response
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version));
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
applyVersion(request.items()[j], versions[j]);
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) e;
}
@ -209,20 +191,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with teh version so it will go to the replicas
deleteRequest.version(delete.version());
// add the response
responses[i] = new BulkItemResponse(item.id(), "delete",
new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound()));
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
responses[i] = new BulkItemResponse(item.id(), "delete", deleteResponse);
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
applyVersion(request.items()[j], versions[j]);
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) e;
}
@ -236,6 +213,109 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
}
} else if (item.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
int retryCount = 0;
do {
UpdateResult updateResult;
try {
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
} catch (Throwable t) {
updateResult = new UpdateResult(null, null, false, t, null);
}
if (updateResult.success()) {
switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
WriteResult result = updateResult.writeResult;
IndexRequest indexRequest = updateResult.request();
BytesReference indexSourceAsBytes = indexRequest.source();
// add the response
IndexResponse indexResponse = result.response();
UpdateResponse updateResponse = new UpdateResponse(indexResponse.getIndex(), indexResponse.getType(), indexResponse.getId(), indexResponse.getVersion());
updateResponse.setMatches(indexResponse.getMatches());
if (updateRequest.fields() != null && updateRequest.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
preVersions[i] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
}
mappingsToUpdate.add(result.mappingToUpdate);
}
if (result.op != null) {
if (ops == null) {
ops = new Engine.IndexingOperation[request.items().length];
}
ops[i] = result.op;
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest);
break;
case DELETE:
DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
// Replace the update request to the translated delete request to execute on the replica.
request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest);
break;
case NONE:
responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
request.items()[i] = null; // No need to go to the replica
break;
}
// NOTE: Breaking out of the retry_on_conflict loop!
break;
} else if (updateResult.failure()) {
Throwable t = updateResult.error;
if (!updateResult.retry) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(t)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) t;
}
if (updateResult.result == null) {
responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
IndexRequest indexRequest = updateResult.request();
if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) {
logger.trace("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
} else {
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t)));
break;
case DELETE:
DeleteRequest deleteRequest = updateResult.request();
if (t instanceof ElasticSearchException && ((ElasticSearchException) t).status() == RestStatus.CONFLICT) {
logger.trace("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
} else {
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[i] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t)));
break;
}
}
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
// NOTE: Breaking out of the retry_on_conflict loop!
break;
}
}
} while (++retryCount < updateRequest.retryOnConflict());
}
}
@ -256,6 +336,177 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return new PrimaryResponse<BulkShardResponse, BulkShardRequest>(shardRequest.request, response, ops);
}
static class WriteResult {
final Object response;
final long preVersion;
final Tuple<String, String> mappingToUpdate;
final Engine.IndexingOperation op;
WriteResult(Object response, long preVersion, Tuple<String, String> mappingToUpdate, Engine.IndexingOperation op) {
this.response = response;
this.preVersion = preVersion;
this.mappingToUpdate = mappingToUpdate;
this.op = op;
}
@SuppressWarnings("unchecked")
<T> T response() {
return (T) response;
}
}
private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest indexRequest, ClusterState clusterState,
IndexShard indexShard, boolean processed) {
// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mappingOrDefault(indexRequest.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (indexRequest.routing() == null) {
throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
}
if (!processed) {
indexRequest.process(clusterState.metaData(), indexRequest.index(), mappingMd, allowIdGeneration);
}
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());
long version;
Engine.IndexingOperation op;
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.index(index);
version = index.version();
op = index;
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.create(create);
version = create.version();
op = create;
}
long preVersion = indexRequest.version();
// update the version on request so it will happen on the replicas
indexRequest.version(version);
// update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
Tuple<String, String> mappingsToUpdate = null;
if (op.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}
// if we are going to percolate, then we need to keep this op for the postPrimary operation
if (!Strings.hasLength(indexRequest.percolate())) {
op = null;
}
IndexResponse indexResponse = new IndexResponse(indexRequest.index(), indexRequest.type(), indexRequest.id(), version);
return new WriteResult(indexResponse, preVersion, mappingsToUpdate, op);
}
private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.version(delete.version());
DeleteResponse deleteResponse = new DeleteResponse(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), delete.version(), delete.notFound());
return new WriteResult(deleteResponse, deleteRequest.version(), null, null);
}
static class UpdateResult {
final UpdateHelper.Result result;
final ActionRequest actionRequest;
final boolean retry;
final Throwable error;
final WriteResult writeResult;
final UpdateResponse noopResult;
UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, boolean retry, Throwable error, WriteResult writeResult) {
this.result = result;
this.actionRequest = actionRequest;
this.retry = retry;
this.error = error;
this.writeResult = writeResult;
this.noopResult = null;
}
UpdateResult(UpdateHelper.Result result, ActionRequest actionRequest, WriteResult writeResult) {
this.result = result;
this.actionRequest = actionRequest;
this.writeResult = writeResult;
this.retry = false;
this.error = null;
this.noopResult = null;
}
public UpdateResult(UpdateHelper.Result result, UpdateResponse updateResponse) {
this.result = result;
this.noopResult = updateResponse;
this.actionRequest = null;
this.writeResult = null;
this.retry = false;
this.error = null;
}
boolean failure() {
return error != null;
}
boolean success() {
return noopResult != null || writeResult != null;
}
@SuppressWarnings("unchecked")
<T extends ActionRequest> T request() {
return (T) actionRequest;
}
}
private UpdateResult shardUpdateOperation(ClusterState clusterState, BulkShardRequest bulkShardRequest, UpdateRequest updateRequest, IndexShard indexShard) {
UpdateHelper.Result translate = updateHelper.prepare(updateRequest, indexShard);
switch (translate.operation()) {
case UPSERT:
case INDEX:
IndexRequest indexRequest = translate.action();
try {
WriteResult result = shardIndexOperation(bulkShardRequest, indexRequest, clusterState, indexShard, false);
return new UpdateResult(translate, indexRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
boolean retry = false;
if (t instanceof VersionConflictEngineException || (t instanceof DocumentAlreadyExistsException && translate.operation() == UpdateHelper.Operation.UPSERT)) {
retry = true;
}
return new UpdateResult(translate, indexRequest, retry, t, null);
}
case DELETE:
DeleteRequest deleteRequest = translate.action();
try {
WriteResult result = shardDeleteOperation(deleteRequest, indexShard);
return new UpdateResult(translate, deleteRequest, result);
} catch (Throwable t) {
t = ExceptionsHelper.unwrapCause(t);
boolean retry = false;
if (t instanceof VersionConflictEngineException) {
retry = true;
}
return new UpdateResult(translate, deleteRequest, retry, t, null);
}
case NONE:
UpdateResponse updateResponse = translate.action();
return new UpdateResult(translate, updateResponse);
default:
throw new ElasticSearchIllegalStateException("Illegal update operation " + translate.operation());
}
}
@Override
protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse, BulkShardRequest> response) {
IndexService indexService = indicesService.indexServiceSafe(request.index());

View File

@ -21,7 +21,7 @@ package org.elasticsearch.action.update;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -35,7 +35,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.TransportIndexAction;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.single.instance.TransportInstanceSingleOperationAction;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -44,67 +43,40 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.DocumentSourceMissingException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.lookup.SourceLookup;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
/**
*/
public class TransportUpdateAction extends TransportInstanceSingleOperationAction<UpdateRequest, UpdateResponse> {
private final IndicesService indicesService;
private final TransportDeleteAction deleteAction;
private final TransportIndexAction indexAction;
private final ScriptService scriptService;
private final AutoCreateIndex autoCreateIndex;
private final TransportCreateIndexAction createIndexAction;
private final UpdateHelper updateHelper;
@Inject
public TransportUpdateAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
IndicesService indicesService, TransportIndexAction indexAction, TransportDeleteAction deleteAction, ScriptService scriptService, TransportCreateIndexAction createIndexAction) {
TransportIndexAction indexAction, TransportDeleteAction deleteAction, TransportCreateIndexAction createIndexAction,
UpdateHelper updateHelper) {
super(settings, threadPool, clusterService, transportService);
this.indicesService = indicesService;
this.indexAction = indexAction;
this.deleteAction = deleteAction;
this.scriptService = scriptService;
this.createIndexAction = createIndexAction;
this.updateHelper = updateHelper;
this.autoCreateIndex = new AutoCreateIndex(settings);
}
@ -212,240 +184,109 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) throws ElasticSearchException {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
long getDate = System.currentTimeMillis();
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);
// no doc, what to do, what to do...
if (!getResult.isExists()) {
if (request.upsertRequest() == null) {
listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
return;
}
final IndexRequest indexRequest = request.upsertRequest();
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.routing(request.routing())
.percolate(request.percolate())
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference updateSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setMatches(response.getMatches());
if (request.fields() != null && request.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(updateSourceBytes, true);
update.setGetResult(extractGetResult(request, response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), updateSourceBytes));
} else {
update.setGetResult(null);
}
listener.onResponse(update);
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
final UpdateHelper.Result result = updateHelper.prepare(request);
switch (result.operation()) {
case UPSERT:
IndexRequest upsertRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setMatches(response.getMatches());
if (request.fields() != null && request.fields().length > 0) {
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(upsertSourceBytes, true);
update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), upsertSourceBytes));
} else {
update.setGetResult(null);
}
listener.onResponse(update);
}
listener.onFailure(e);
}
});
return;
}
if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...
listener.onFailure(new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
return;
}
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
String operation = null;
String timestamp = null;
Long ttl = null;
Object fetchedTTL = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null;
if (request.script() == null && request.doc() != null) {
IndexRequest indexRequest = request.doc();
updatedSourceAsMap = sourceAndContent.v2();
if (indexRequest.ttl() > 0) {
ttl = indexRequest.ttl();
}
timestamp = indexRequest.timestamp();
if (indexRequest.routing() != null) {
routing = indexRequest.routing();
}
if (indexRequest.parent() != null) {
parent = indexRequest.parent();
}
XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap());
} else {
Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", sourceAndContent.v2());
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("failed to execute script", e);
}
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");
fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}
// apply script to update the source
// No TTL has been given in the update script so we keep previous TTL value if there is one
if (ttl == null) {
ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
if (ttl != null) {
ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved
}
}
// TODO: external version type, does it make sense here? does not seem like it...
if (operation == null || "index".equals(operation)) {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.percolate(request.percolate())
.refresh(request.refresh());
indexRequest.operationThreaded(false);
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference updateSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setMatches(response.getMatches());
update.setGetResult(extractGetResult(request, response.getVersion(), updatedSourceAsMap, updateSourceContentType, updateSourceBytes));
listener.onResponse(update);
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(e);
}
});
break;
case INDEX:
IndexRequest indexRequest = result.action();
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setMatches(response.getMatches());
update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), indexSourceBytes));
listener.onResponse(update);
}
listener.onFailure(e);
}
});
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setGetResult(extractGetResult(request, response.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
listener.onResponse(update);
}
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(e);
}
listener.onFailure(e);
}
});
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion());
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
listener.onResponse(update);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
listener.onResponse(new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion()));
}
}
@Nullable
protected GetResult extractGetResult(final UpdateRequest request, long version, final Map<String, Object> source, XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) {
if (request.fields() == null || request.fields().length == 0) {
return null;
}
boolean sourceRequested = false;
Map<String, GetField> fields = null;
if (request.fields() != null && request.fields().length > 0) {
SourceLookup sourceLookup = new SourceLookup();
sourceLookup.setNextSource(source);
for (String field : request.fields()) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
Object value = sourceLookup.extractValue(field);
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
});
break;
case DELETE:
DeleteRequest deleteRequest = result.action();
deleteAction.execute(deleteRequest, new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse response) {
UpdateResponse update = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
update.setGetResult(updateHelper.extractGetResult(request, response.getVersion(), result.updatedSourceAsMap(), result.updateSourceContentType(), null));
listener.onResponse(update);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
@Override
public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
@Override
public void run() {
shardOperation(request, listener, retryCount + 1);
}
});
return;
}
}
listener.onFailure(e);
}
getField.getValues().add(value);
}
}
});
break;
case NONE:
UpdateResponse update = result.action();
listener.onResponse(update);
break;
default:
throw new ElasticSearchIllegalStateException("Illegal operation " + result.operation());
}
// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields);
}
}

View File

@ -0,0 +1,254 @@
package org.elasticsearch.action.update;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.index.engine.DocumentSourceMissingException;
import org.elasticsearch.index.get.GetField;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.lookup.SourceLookup;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import static com.google.common.collect.Maps.newHashMapWithExpectedSize;
/**
* Helper for translating an update request to an index, delete request or update response.
*/
public class UpdateHelper extends AbstractComponent {
private final IndicesService indicesService;
private final ScriptService scriptService;
@Inject
public UpdateHelper(Settings settings, IndicesService indicesService, ScriptService scriptService) {
super(settings);
this.indicesService = indicesService;
this.scriptService = scriptService;
}
/**
* Prepares an update request by converting it into an index or delete request or an update response (no action).
*/
public Result prepare(UpdateRequest request) {
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
return prepare(request, indexShard);
}
public Result prepare(UpdateRequest request, IndexShard indexShard) {
long getDate = System.currentTimeMillis();
final GetResult getResult = indexShard.getService().get(request.type(), request.id(),
new String[]{SourceFieldMapper.NAME, RoutingFieldMapper.NAME, ParentFieldMapper.NAME, TTLFieldMapper.NAME}, true);
if (!getResult.isExists()) {
if (request.upsertRequest() == null) {
throw new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
IndexRequest indexRequest = request.upsertRequest();
indexRequest.index(request.index()).type(request.type()).id(request.id())
// it has to be a "create!"
.create(true)
.routing(request.routing())
.percolate(request.percolate())
.refresh(request.refresh())
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
indexRequest.operationThreaded(false);
return new Result(indexRequest, Operation.UPSERT, null, null);
}
if (getResult.internalSourceRef() == null) {
// no source, we can't do nothing, through a failure...
throw new DocumentSourceMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id());
}
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
String operation = null;
String timestamp = null;
Long ttl = null;
Object fetchedTTL = null;
final Map<String, Object> updatedSourceAsMap;
final XContentType updateSourceContentType = sourceAndContent.v1();
String routing = getResult.getFields().containsKey(RoutingFieldMapper.NAME) ? getResult.field(RoutingFieldMapper.NAME).getValue().toString() : null;
String parent = getResult.getFields().containsKey(ParentFieldMapper.NAME) ? getResult.field(ParentFieldMapper.NAME).getValue().toString() : null;
if (request.script() == null && request.doc() != null) {
IndexRequest indexRequest = request.doc();
updatedSourceAsMap = sourceAndContent.v2();
if (indexRequest.ttl() > 0) {
ttl = indexRequest.ttl();
}
timestamp = indexRequest.timestamp();
if (indexRequest.routing() != null) {
routing = indexRequest.routing();
}
if (indexRequest.parent() != null) {
parent = indexRequest.parent();
}
XContentHelper.update(updatedSourceAsMap, indexRequest.sourceAsMap());
} else {
Map<String, Object> ctx = new HashMap<String, Object>(2);
ctx.put("_source", sourceAndContent.v2());
try {
ExecutableScript script = scriptService.executable(request.scriptLang, request.script, request.scriptParams);
script.setNextVar("ctx", ctx);
script.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
} catch (Exception e) {
throw new ElasticSearchIllegalArgumentException("failed to execute script", e);
}
operation = (String) ctx.get("op");
timestamp = (String) ctx.get("_timestamp");
fetchedTTL = ctx.get("_ttl");
if (fetchedTTL != null) {
if (fetchedTTL instanceof Number) {
ttl = ((Number) fetchedTTL).longValue();
} else {
ttl = TimeValue.parseTimeValue((String) fetchedTTL, null).millis();
}
}
updatedSourceAsMap = (Map<String, Object>) ctx.get("_source");
}
// apply script to update the source
// No TTL has been given in the update script so we keep previous TTL value if there is one
if (ttl == null) {
ttl = getResult.getFields().containsKey(TTLFieldMapper.NAME) ? (Long) getResult.field(TTLFieldMapper.NAME).getValue() : null;
if (ttl != null) {
ttl = ttl - (System.currentTimeMillis() - getDate); // It is an approximation of exact TTL value, could be improved
}
}
// TODO: external version type, does it make sense here? does not seem like it...
if (operation == null || "index".equals(operation)) {
final IndexRequest indexRequest = Requests.indexRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.source(updatedSourceAsMap, updateSourceContentType)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel())
.timestamp(timestamp).ttl(ttl)
.percolate(request.percolate())
.refresh(request.refresh());
indexRequest.operationThreaded(false);
return new Result(indexRequest, Operation.INDEX, updatedSourceAsMap, updateSourceContentType);
} else if ("delete".equals(operation)) {
DeleteRequest deleteRequest = Requests.deleteRequest(request.index()).type(request.type()).id(request.id()).routing(routing).parent(parent)
.version(getResult.getVersion()).replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
deleteRequest.operationThreaded(false);
return new Result(deleteRequest, Operation.DELETE, updatedSourceAsMap, updateSourceContentType);
} else if ("none".equals(operation)) {
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion());
update.setGetResult(extractGetResult(request, getResult.getVersion(), updatedSourceAsMap, updateSourceContentType, null));
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
} else {
logger.warn("Used update operation [{}] for script [{}], doing nothing...", operation, request.script);
UpdateResponse update = new UpdateResponse(getResult.getIndex(), getResult.getType(), getResult.getId(), getResult.getVersion());
return new Result(update, Operation.NONE, updatedSourceAsMap, updateSourceContentType);
}
}
/**
* Extracts the fields from the updated document to be returned in a update response
*/
public GetResult extractGetResult(final UpdateRequest request, long version, final Map<String, Object> source, XContentType sourceContentType, @Nullable final BytesReference sourceAsBytes) {
if (request.fields() == null || request.fields().length == 0) {
return null;
}
boolean sourceRequested = false;
Map<String, GetField> fields = null;
if (request.fields() != null && request.fields().length > 0) {
SourceLookup sourceLookup = new SourceLookup();
sourceLookup.setNextSource(source);
for (String field : request.fields()) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
Object value = sourceLookup.extractValue(field);
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.getValues().add(value);
}
}
}
// TODO when using delete/none, we can still return the source as bytes by generating it (using the sourceContentType)
return new GetResult(request.index(), request.type(), request.id(), version, true, sourceRequested ? sourceAsBytes : null, fields);
}
public static class Result {
private final Streamable action;
private final Operation operation;
private final Map<String, Object> updatedSourceAsMap;
private final XContentType updateSourceContentType;
public Result(Streamable action, Operation operation, Map<String, Object> updatedSourceAsMap, XContentType updateSourceContentType) {
this.action = action;
this.operation = operation;
this.updatedSourceAsMap = updatedSourceAsMap;
this.updateSourceContentType = updateSourceContentType;
}
@SuppressWarnings("unchecked")
public <T extends Streamable> T action() {
return (T) action;
}
public Operation operation() {
return operation;
}
public Map<String, Object> updatedSourceAsMap() {
return updatedSourceAsMap;
}
public XContentType updateSourceContentType() {
return updateSourceContentType;
}
}
public static enum Operation {
UPSERT,
INDEX,
DELETE,
NONE
}
}

View File

@ -72,7 +72,7 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
@Nullable
private IndexRequest doc;
UpdateRequest() {
public UpdateRequest() {
}
@ -188,6 +188,10 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
return this;
}
public String scriptLang() {
return scriptLang;
}
/**
* Add a script parameter.
*/

View File

@ -86,7 +86,7 @@ public class UpdateResponse extends ActionResponse {
return this.matches;
}
void setGetResult(GetResult getResult) {
public void setGetResult(GetResult getResult) {
this.getResult = getResult;
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.indices;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.SpawnModules;
@ -75,5 +76,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).to(InternalIndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,310 @@
package org.elasticsearch.test.integration.document;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
/**
*/
public class BulkTests extends AbstractNodesTests {
private Client client;
@BeforeClass
public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass
public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test
public void testBulkUpdate_simple() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
BulkResponse bulkResponse = client.prepareBulk()
.add(client.prepareIndex().setIndex("test").setType("type1").setId("1").setSource("field", 1))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("2").setSource("field", 2).setCreate(true))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("3").setSource("field", 3))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("4").setSource("field", 4))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("5").setSource("field", 5))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(5));
bulkResponse = client.prepareBulk()
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("1").setScript("ctx._source.field += 1"))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1").setRetryOnConflict(3))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("3").setDoc(jsonBuilder().startObject().field("field1", "test").endObject()))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(3));
assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getId(), equalTo("1"));
assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(2l));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getId(), equalTo("3"));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(2l));
GetResponse getResponse = client.prepareGet().setIndex("test").setType("type1").setId("1").setFields("field").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(2l));
assertThat(((Long) getResponse.getField("field").getValue()), equalTo(2l));
getResponse = client.prepareGet().setIndex("test").setType("type1").setId("2").setFields("field").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(2l));
assertThat(((Long) getResponse.getField("field").getValue()), equalTo(3l));
getResponse = client.prepareGet().setIndex("test").setType("type1").setId("3").setFields("field1").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(2l));
assertThat(getResponse.getField("field1").getValue().toString(), equalTo("test"));
bulkResponse = client.prepareBulk()
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("6").setScript("ctx._source.field += 1")
.setUpsertRequest(jsonBuilder().startObject().field("field", 0).endObject()))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("7").setScript("ctx._source.field += 1"))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1"))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(true));
assertThat(bulkResponse.getItems().length, equalTo(3));
assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getId(), equalTo("6"));
assertThat(((UpdateResponse) bulkResponse.getItems()[0].getResponse()).getVersion(), equalTo(1l));
assertThat(bulkResponse.getItems()[1].getResponse(), nullValue());
assertThat(bulkResponse.getItems()[1].getFailure().getId(), equalTo("7"));
assertThat(bulkResponse.getItems()[1].getFailure().getMessage(), containsString("DocumentMissingException"));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getId(), equalTo("2"));
assertThat(((UpdateResponse) bulkResponse.getItems()[2].getResponse()).getVersion(), equalTo(3l));
getResponse = client.prepareGet().setIndex("test").setType("type1").setId("6").setFields("field").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat(((Long) getResponse.getField("field").getValue()), equalTo(0l));
getResponse = client.prepareGet().setIndex("test").setType("type1").setId("7").setFields("field").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(false));
getResponse = client.prepareGet().setIndex("test").setType("type1").setId("2").setFields("field").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(3l));
assertThat(((Long) getResponse.getField("field").getValue()), equalTo(4l));
}
@Test
public void testBulkUpdate_malformedScripts() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 0)
).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
BulkResponse bulkResponse = client.prepareBulk()
.add(client.prepareIndex().setIndex("test").setType("type1").setId("1").setSource("field", 1))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("2").setSource("field", 1))
.add(client.prepareIndex().setIndex("test").setType("type1").setId("3").setSource("field", 1))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(false));
assertThat(bulkResponse.getItems().length, equalTo(3));
bulkResponse = client.prepareBulk()
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("1").setScript("ctx._source.field += a").setFields("field"))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("2").setScript("ctx._source.field += 1").setFields("field"))
.add(client.prepareUpdate().setIndex("test").setType("type1").setId("3").setScript("ctx._source.field += a").setFields("field"))
.execute().actionGet();
assertThat(bulkResponse.hasFailures(), equalTo(true));
assertThat(bulkResponse.getItems().length, equalTo(3));
assertThat(bulkResponse.getItems()[0].getFailure().getId(), equalTo("1"));
assertThat(bulkResponse.getItems()[0].getFailure().getMessage(), containsString("failed to execute script"));
assertThat(bulkResponse.getItems()[0].getResponse(), nullValue());
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getId(), equalTo("2"));
assertThat(((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getVersion(), equalTo(2l));
assertThat(((Integer)((UpdateResponse) bulkResponse.getItems()[1].getResponse()).getGetResult().field("field").getValue()), equalTo(2));
assertThat(bulkResponse.getItems()[1].getFailure(), nullValue());
assertThat(bulkResponse.getItems()[2].getFailure().getId(), equalTo("3"));
assertThat(bulkResponse.getItems()[2].getFailure().getMessage(), containsString("failed to execute script"));
assertThat(bulkResponse.getItems()[2].getResponse(), nullValue());
}
@Test
public void testBulkUpdate_largerVolume() throws Exception {
client.admin().indices().prepareDelete().execute().actionGet();
client.admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
).execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
int numDocs = 2000;
BulkRequestBuilder builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i))
.setScript("ctx._source.counter += 1").setFields("counter")
.setUpsertRequest(jsonBuilder().startObject().field("counter", 1).endObject())
);
}
BulkResponse response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getVersion(), equalTo(1l));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i)));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(1l));
assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(1));
for (int j = 0; j < 5; j++) {
GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(true));
assertThat(getResponse.getVersion(), equalTo(1l));
assertThat((Long) getResponse.getField("counter").getValue(), equalTo(1l));
}
}
builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
UpdateRequestBuilder updateBuilder = client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setFields("counter");
if (i % 2 == 0) {
updateBuilder.setScript("ctx._source.counter += 1");
} else {
updateBuilder.setDoc(jsonBuilder().startObject().field("counter", 2).endObject());
}
if (i % 3 == 0) {
updateBuilder.setRetryOnConflict(3);
}
builder.add(updateBuilder);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getVersion(), equalTo(2l));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getId(), equalTo(Integer.toString(i)));
assertThat(((UpdateResponse) response.getItems()[i].getResponse()).getVersion(), equalTo(2l));
assertThat(((Integer)((UpdateResponse) response.getItems()[i].getResponse()).getGetResult().field("counter").getValue()), equalTo(2));
}
builder = client.prepareBulk();
int maxDocs = numDocs / 2 + numDocs;
for (int i = (numDocs / 2); i < maxDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx._source.counter += 1")
);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(true));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
int id = i + (numDocs / 2);
if (i >= (numDocs / 2)) {
assertThat(response.getItems()[i].getFailure().getId(), equalTo(Integer.toString(id)));
assertThat(response.getItems()[i].getFailure().getMessage(), containsString("DocumentMissingException"));
} else {
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(id)));
assertThat(response.getItems()[i].getVersion(), equalTo(3l));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
}
}
builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"none\"")
);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getItemId(), equalTo(i));
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
}
builder = client.prepareBulk();
for (int i = 0; i < numDocs; i++) {
builder.add(
client.prepareUpdate()
.setIndex("test").setType("type1").setId(Integer.toString(i)).setScript("ctx.op = \"delete\"")
);
}
response = builder.execute().actionGet();
assertThat(response.hasFailures(), equalTo(false));
assertThat(response.getItems().length, equalTo(numDocs));
for (int i = 0; i < numDocs; i++) {
assertThat(response.getItems()[i].getItemId(), equalTo(i));
assertThat(response.getItems()[i].getId(), equalTo(Integer.toString(i)));
assertThat(response.getItems()[i].getIndex(), equalTo("test"));
assertThat(response.getItems()[i].getType(), equalTo("type1"));
assertThat(response.getItems()[i].getOpType(), equalTo("update"));
for (int j = 0; j < 5; j++) {
GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).setFields("counter").execute().actionGet();
assertThat(getResponse.isExists(), equalTo(false));
}
}
}
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Priority;
@ -38,6 +39,7 @@ import org.testng.annotations.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@ -45,6 +47,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
public class UpdateTests extends AbstractNodesTests {
private Client client;
@BeforeClass
@ -378,4 +381,64 @@ public class UpdateTests extends AbstractNodesTests {
assertThat(map2.containsKey("commonkey"), equalTo(true));
}
}
@Test
public void testConcurrentUpdateWithRetryOnConflict() throws Exception {
concurrentUpdateWithRetryOnConflict(false);
}
@Test
public void testConcurrentUpdateWithRetryOnConflict_bulk() throws Exception {
concurrentUpdateWithRetryOnConflict(true);
}
private void concurrentUpdateWithRetryOnConflict(final boolean useBulkApi) throws Exception {
createIndex();
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
assertThat(clusterHealth.isTimedOut(), equalTo(false));
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
int numberOfThreads = 5;
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final int numberOfUpdatesPerThread = 10000;
for (int i = 0; i < numberOfThreads; i++) {
Runnable r = new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < numberOfUpdatesPerThread; i++) {
if (useBulkApi) {
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate("test", "type1", Integer.toString(i))
.setScript("ctx._source.field += 1")
.setRetryOnConflict(Integer.MAX_VALUE)
.setUpsertRequest(jsonBuilder().startObject().field("field", 1).endObject());
client.prepareBulk().add(updateRequestBuilder).execute().actionGet();
} else {
client.prepareUpdate("test", "type1", Integer.toString(i)).setScript("ctx._source.field += 1")
.setRetryOnConflict(Integer.MAX_VALUE)
.setUpsertRequest(jsonBuilder().startObject().field("field", 1).endObject())
.execute().actionGet();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown();
}
}
};
new Thread(r).start();
}
latch.await();
for (int i = 0; i < numberOfUpdatesPerThread; i++) {
GetResponse response = client.prepareGet("test", "type1", Integer.toString(i)).execute().actionGet();
assertThat(response.getId(), equalTo(Integer.toString(i)));
assertThat(response.getVersion(), equalTo((long) numberOfThreads));
assertThat((Integer) response.getSource().get("field"), equalTo(numberOfThreads));
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.test.unit.action.bulk;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.bytes.BytesArray;
import org.testng.annotations.Test;
@ -58,4 +59,24 @@ public class BulkRequestTests {
bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null);
assertThat(bulkRequest.numberOfActions(), equalTo(3));
}
@Test
public void testSimpleBulk4() throws Exception {
String bulkAction = copyToStringFromClasspath("/org/elasticsearch/test/unit/action/bulk/simple-bulk4.json");
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(bulkAction.getBytes(), 0, bulkAction.length(), true, null, null);
assertThat(bulkRequest.numberOfActions(), equalTo(4));
assertThat(((UpdateRequest) bulkRequest.requests().get(0)).id(), equalTo("1"));
assertThat(((UpdateRequest) bulkRequest.requests().get(0)).retryOnConflict(), equalTo(2));
assertThat(((UpdateRequest) bulkRequest.requests().get(0)).doc().source().toUtf8(), equalTo("{\"field\":\"value\"}"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).id(), equalTo("0"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).type(), equalTo("type1"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).index(), equalTo("index1"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).script(), equalTo("counter += param1"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).scriptLang(), equalTo("js"));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).scriptParams().size(), equalTo(1));
assertThat(((Integer) ((UpdateRequest) bulkRequest.requests().get(1)).scriptParams().get("param1")), equalTo(1));
assertThat(((UpdateRequest) bulkRequest.requests().get(1)).upsertRequest().source().toUtf8(), equalTo("{\"counter\":1}"));
}
}

View File

@ -0,0 +1,7 @@
{ "update" : {"_id" : "1", "_retry_on_conflict" : 2} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "type1", "_index" : "index1" } }
{ "script" : "counter += param1", "lang" : "js", "params" : {"param1" : 1}, "upsert" : {"counter" : 1}}
{ "delete" : { "_id" : "2" } }
{ "create" : { "_id" : "3" } }
{ "field1" : "value3" }