From 9905eab73aa8d0258fdf8f7d58c429afc00e1d60 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 8 Jun 2012 02:01:04 +0200 Subject: [PATCH] Update API: Allow to upsert, provide a doc and index it if the doc does not exists, closes #2008. --- .../action/update/TransportUpdateAction.java | 43 ++++++++- .../action/update/UpdateRequest.java | 87 +++++++++++++++++++ .../action/update/UpdateRequestBuilder.java | 60 +++++++++++++ .../rest/action/update/RestUpdateAction.java | 18 +++- .../test/integration/update/UpdateTests.java | 28 ++++++ 5 files changed, 234 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java index 480ca31e066..0ac7253911a 100644 --- a/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java +++ b/src/main/java/org/elasticsearch/action/update/TransportUpdateAction.java @@ -165,7 +165,48 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio // no doc, what to do, what to do... if (!getResult.exists()) { - listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); + if (request.indexRequest() == null) { + listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id())); + return; + } + IndexRequest indexRequest = request.indexRequest(); + indexRequest.index(request.index()).type(request.type()).id(request.id()) + // it has to be a "create!" + .create(true) + .routing(request.routing()) + .percolate(request.percolate()) + .refresh(request.refresh()) + .replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel()); + indexRequest.operationThreaded(false); + // we fetch it from the index request so we don't generate the bytes twice, its already done in the index request + final BytesHolder updateSourceBytes = indexRequest.underlyingSourceBytes(); + indexAction.execute(indexRequest, new ActionListener() { + @Override + public void onResponse(IndexResponse response) { + UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version()); + update.matches(response.matches()); + // TODO: we can parse the index _source and extractGetResult if applicable + update.getResult(null); + listener.onResponse(update); + } + + @Override + public void onFailure(Throwable e) { + e = ExceptionsHelper.unwrapCause(e); + if (e instanceof VersionConflictEngineException) { + if (retryCount < request.retryOnConflict()) { + threadPool.executor(executor()).execute(new Runnable() { + @Override + public void run() { + shardOperation(request, listener, retryCount + 1); + } + }); + return; + } + } + listener.onFailure(e); + } + }); return; } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index 1cbb1f60c5f..01cc0828c30 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -22,12 +22,15 @@ package org.elasticsearch.action.update; import com.google.common.collect.Maps; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; @@ -60,6 +63,8 @@ public class UpdateRequest extends InstanceShardOperationRequest { private ReplicationType replicationType = ReplicationType.DEFAULT; private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT; + private IndexRequest indexRequest; + UpdateRequest() { } @@ -330,6 +335,74 @@ public class UpdateRequest extends InstanceShardOperationRequest { return this; } + /** + * Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException} + * is thrown. + */ + public UpdateRequest doc(IndexRequest indexRequest) { + this.indexRequest = indexRequest; + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(XContentBuilder source) { + safeIndexRequest().source(source); + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(Map source) { + safeIndexRequest().source(source); + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(Map source, XContentType contentType) { + safeIndexRequest().source(source, contentType); + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(String source) { + safeIndexRequest().source(source); + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(byte[] source) { + safeIndexRequest().source(source); + return this; + } + + /** + * Sets the doc source of the update request to be used when the document does not exists. + */ + public UpdateRequest doc(byte[] source, int offset, int length) { + safeIndexRequest().source(source, offset, length); + return this; + } + + public IndexRequest indexRequest() { + return this.indexRequest; + } + + private IndexRequest safeIndexRequest() { + if (indexRequest == null) { + indexRequest = new IndexRequest(); + } + return indexRequest; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); @@ -357,6 +430,10 @@ public class UpdateRequest extends InstanceShardOperationRequest { fields[i] = in.readUTF(); } } + if (in.readBoolean()) { + indexRequest = new IndexRequest(); + indexRequest.readFrom(in); + } } @Override @@ -396,5 +473,15 @@ public class UpdateRequest extends InstanceShardOperationRequest { out.writeUTF(field); } } + if (indexRequest == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + // make sure the basics are set + indexRequest.index(index); + indexRequest.type(type); + indexRequest.id(id); + indexRequest.writeTo(out); + } } } diff --git a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java index a6e99d98918..448a79e4435 100644 --- a/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java +++ b/src/main/java/org/elasticsearch/action/update/UpdateRequestBuilder.java @@ -21,10 +21,13 @@ package org.elasticsearch.action.update; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.BaseRequestBuilder; import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.client.Client; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import java.util.Map; @@ -180,6 +183,63 @@ public class UpdateRequestBuilder extends BaseRequestBuilder listener) { client.update(request, listener); diff --git a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java index 31fbd290d6f..44d9ecab07a 100644 --- a/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java +++ b/src/main/java/org/elasticsearch/rest/action/update/RestUpdateAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.update; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.WriteConsistencyLevel; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.replication.ReplicationType; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; @@ -32,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.VersionType; import org.elasticsearch.rest.*; +import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestXContentBuilder; import java.io.IOException; @@ -91,7 +94,7 @@ public class RestUpdateAction extends BaseRestHandler { if (xContentType != null) { try { Map content = XContentFactory.xContent(xContentType) - .createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose(); + .createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapOrderedAndClose(); if (content.containsKey("script")) { updateRequest.script(content.get("script").toString()); } @@ -101,6 +104,19 @@ public class RestUpdateAction extends BaseRestHandler { if (content.containsKey("params")) { updateRequest.scriptParams((Map) content.get("params")); } + if (content.containsKey("doc")) { + IndexRequest indexRequest = new IndexRequest(); + indexRequest.source((Map) content.get("doc"), xContentType); + indexRequest.routing(request.param("routing")); + indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing + indexRequest.timestamp(request.param("timestamp")); + if (request.hasParam("ttl")) { + indexRequest.ttl(request.paramAsTime("ttl", null).millis()); + } + indexRequest.version(RestActions.parseVersion(request)); + indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType())); + updateRequest.doc(indexRequest); + } } catch (Exception e) { try { channel.sendResponse(new XContentThrowableRestResponse(request, e)); diff --git a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java index 330e975705b..12f0aeb8526 100644 --- a/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java +++ b/src/test/java/org/elasticsearch/test/integration/update/UpdateTests.java @@ -84,6 +84,34 @@ public class UpdateTests extends AbstractNodesTests { return client("node1"); } + @Test + public void testUpsert() throws Exception { + createIndex(); + ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + + client.prepareUpdate("test", "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) + .setScript("ctx._source.field += 1") + .execute().actionGet(); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("1")); + } + + client.prepareUpdate("test", "type1", "1") + .setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject()) + .setScript("ctx._source.field += 1") + .execute().actionGet(); + + for (int i = 0; i < 5; i++) { + GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet(); + assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("2")); + } + } + @Test public void testUpdate() throws Exception { createIndex();