mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-04 18:09:21 +00:00
Update API: Allow to upsert, provide a doc and index it if the doc does not exists, closes #2008.
This commit is contained in:
parent
898fef11c9
commit
9905eab73a
src
main/java/org/elasticsearch
action/update
rest/action/update
test/java/org/elasticsearch/test/integration/update
@ -165,7 +165,48 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
|
||||
|
||||
// no doc, what to do, what to do...
|
||||
if (!getResult.exists()) {
|
||||
listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
|
||||
if (request.indexRequest() == null) {
|
||||
listener.onFailure(new DocumentMissingException(new ShardId(request.index(), request.shardId()), request.type(), request.id()));
|
||||
return;
|
||||
}
|
||||
IndexRequest indexRequest = request.indexRequest();
|
||||
indexRequest.index(request.index()).type(request.type()).id(request.id())
|
||||
// it has to be a "create!"
|
||||
.create(true)
|
||||
.routing(request.routing())
|
||||
.percolate(request.percolate())
|
||||
.refresh(request.refresh())
|
||||
.replicationType(request.replicationType()).consistencyLevel(request.consistencyLevel());
|
||||
indexRequest.operationThreaded(false);
|
||||
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
|
||||
final BytesHolder updateSourceBytes = indexRequest.underlyingSourceBytes();
|
||||
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(IndexResponse response) {
|
||||
UpdateResponse update = new UpdateResponse(response.index(), response.type(), response.id(), response.version());
|
||||
update.matches(response.matches());
|
||||
// TODO: we can parse the index _source and extractGetResult if applicable
|
||||
update.getResult(null);
|
||||
listener.onResponse(update);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
e = ExceptionsHelper.unwrapCause(e);
|
||||
if (e instanceof VersionConflictEngineException) {
|
||||
if (retryCount < request.retryOnConflict()) {
|
||||
threadPool.executor(executor()).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
shardOperation(request, listener, retryCount + 1);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
}
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -22,12 +22,15 @@ package org.elasticsearch.action.update;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationType;
|
||||
import org.elasticsearch.action.support.single.instance.InstanceShardOperationRequest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
@ -60,6 +63,8 @@ public class UpdateRequest extends InstanceShardOperationRequest {
|
||||
private ReplicationType replicationType = ReplicationType.DEFAULT;
|
||||
private WriteConsistencyLevel consistencyLevel = WriteConsistencyLevel.DEFAULT;
|
||||
|
||||
private IndexRequest indexRequest;
|
||||
|
||||
UpdateRequest() {
|
||||
|
||||
}
|
||||
@ -330,6 +335,74 @@ public class UpdateRequest extends InstanceShardOperationRequest {
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
|
||||
* is thrown.
|
||||
*/
|
||||
public UpdateRequest doc(IndexRequest indexRequest) {
|
||||
this.indexRequest = indexRequest;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(XContentBuilder source) {
|
||||
safeIndexRequest().source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(Map source) {
|
||||
safeIndexRequest().source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(Map source, XContentType contentType) {
|
||||
safeIndexRequest().source(source, contentType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(String source) {
|
||||
safeIndexRequest().source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(byte[] source) {
|
||||
safeIndexRequest().source(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequest doc(byte[] source, int offset, int length) {
|
||||
safeIndexRequest().source(source, offset, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
public IndexRequest indexRequest() {
|
||||
return this.indexRequest;
|
||||
}
|
||||
|
||||
private IndexRequest safeIndexRequest() {
|
||||
if (indexRequest == null) {
|
||||
indexRequest = new IndexRequest();
|
||||
}
|
||||
return indexRequest;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
@ -357,6 +430,10 @@ public class UpdateRequest extends InstanceShardOperationRequest {
|
||||
fields[i] = in.readUTF();
|
||||
}
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
indexRequest = new IndexRequest();
|
||||
indexRequest.readFrom(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -396,5 +473,15 @@ public class UpdateRequest extends InstanceShardOperationRequest {
|
||||
out.writeUTF(field);
|
||||
}
|
||||
}
|
||||
if (indexRequest == null) {
|
||||
out.writeBoolean(false);
|
||||
} else {
|
||||
out.writeBoolean(true);
|
||||
// make sure the basics are set
|
||||
indexRequest.index(index);
|
||||
indexRequest.type(type);
|
||||
indexRequest.id(id);
|
||||
indexRequest.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -21,10 +21,13 @@ package org.elasticsearch.action.update;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.BaseRequestBuilder;
|
||||
import org.elasticsearch.action.support.replication.ReplicationType;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@ -180,6 +183,63 @@ public class UpdateRequestBuilder extends BaseRequestBuilder<UpdateRequest, Upda
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the index request to be used if the document does not exists. Otherwise, a {@link org.elasticsearch.index.engine.DocumentMissingException}
|
||||
* is thrown.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(IndexRequest indexRequest) {
|
||||
request.doc(indexRequest);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(XContentBuilder source) {
|
||||
request.doc(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(Map source) {
|
||||
request.doc(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(Map source, XContentType contentType) {
|
||||
request.doc(source, contentType);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(String source) {
|
||||
request.doc(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(byte[] source) {
|
||||
request.doc(source);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the doc source of the update request to be used when the document does not exists.
|
||||
*/
|
||||
public UpdateRequestBuilder setDoc(byte[] source, int offset, int length) {
|
||||
request.doc(source, offset, length);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ActionListener<UpdateResponse> listener) {
|
||||
client.update(request, listener);
|
||||
|
@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.update;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationType;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
@ -32,7 +33,9 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.VersionType;
|
||||
import org.elasticsearch.rest.*;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestXContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
@ -91,7 +94,7 @@ public class RestUpdateAction extends BaseRestHandler {
|
||||
if (xContentType != null) {
|
||||
try {
|
||||
Map<String, Object> content = XContentFactory.xContent(xContentType)
|
||||
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapAndClose();
|
||||
.createParser(request.contentByteArray(), request.contentByteArrayOffset(), request.contentLength()).mapOrderedAndClose();
|
||||
if (content.containsKey("script")) {
|
||||
updateRequest.script(content.get("script").toString());
|
||||
}
|
||||
@ -101,6 +104,19 @@ public class RestUpdateAction extends BaseRestHandler {
|
||||
if (content.containsKey("params")) {
|
||||
updateRequest.scriptParams((Map<String, Object>) content.get("params"));
|
||||
}
|
||||
if (content.containsKey("doc")) {
|
||||
IndexRequest indexRequest = new IndexRequest();
|
||||
indexRequest.source((Map) content.get("doc"), xContentType);
|
||||
indexRequest.routing(request.param("routing"));
|
||||
indexRequest.parent(request.param("parent")); // order is important, set it after routing, so it will set the routing
|
||||
indexRequest.timestamp(request.param("timestamp"));
|
||||
if (request.hasParam("ttl")) {
|
||||
indexRequest.ttl(request.paramAsTime("ttl", null).millis());
|
||||
}
|
||||
indexRequest.version(RestActions.parseVersion(request));
|
||||
indexRequest.versionType(VersionType.fromString(request.param("version_type"), indexRequest.versionType()));
|
||||
updateRequest.doc(indexRequest);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
try {
|
||||
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||
|
@ -84,6 +84,34 @@ public class UpdateTests extends AbstractNodesTests {
|
||||
return client("node1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsert() throws Exception {
|
||||
createIndex();
|
||||
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
|
||||
assertThat(clusterHealth.timedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
||||
client.prepareUpdate("test", "type1", "1")
|
||||
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
|
||||
.setScript("ctx._source.field += 1")
|
||||
.execute().actionGet();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
|
||||
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("1"));
|
||||
}
|
||||
|
||||
client.prepareUpdate("test", "type1", "1")
|
||||
.setDoc(XContentFactory.jsonBuilder().startObject().field("field", 1).endObject())
|
||||
.setScript("ctx._source.field += 1")
|
||||
.execute().actionGet();
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
GetResponse getResponse = client.prepareGet("test", "type1", "1").execute().actionGet();
|
||||
assertThat(getResponse.sourceAsMap().get("field").toString(), equalTo("2"));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdate() throws Exception {
|
||||
createIndex();
|
||||
|
Loading…
x
Reference in New Issue
Block a user