From b61709c7162227b72616f1040c32678dd8988324 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 8 Jul 2015 00:58:12 -0400 Subject: [PATCH] Add support for retrieving fields in bulk updates This commit adds support to retrieve fields when using the bulk update API. This functionality was previously available for the update API but not for the bulk update API. Closes #11527 --- .../action/bulk/BulkProcessor.java | 2 +- .../action/bulk/BulkRequest.java | 14 ++++-- .../action/update/UpdateRequest.java | 5 ++ .../rest/action/bulk/RestBulkAction.java | 11 ++++- docs/reference/docs/bulk.asciidoc | 6 ++- .../resources/rest-api-spec/api/bulk.json | 4 ++ .../rest-api-spec/test/bulk/40_fields.yaml | 49 +++++++++++++++++++ 7 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/bulk/40_fields.yaml diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java index be26f318625..43e433146bd 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkProcessor.java @@ -285,7 +285,7 @@ public class BulkProcessor implements Closeable { } public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception { - bulkRequest.add(data, defaultIndex, defaultType, null, payload, true); + bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true); executeIfNeeded(); return this; } diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java index 3e2daa421ee..8f41f970286 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkRequest.java @@ -30,6 +30,7 @@ import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; @@ -239,17 +240,17 @@ public class BulkRequest extends ActionRequest implements Composite * Adds a framed data in binary format */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception { - return add(data, defaultIndex, defaultType, null, null, true); + return add(data, defaultIndex, defaultType, null, null, null, true); } /** * Adds a framed data in binary format */ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex) throws Exception { - return add(data, defaultIndex, defaultType, null, null, allowExplicitIndex); + return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex); } - public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { + public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { XContent xContent = XContentFactory.xContent(data); int line = 0; int from = 0; @@ -283,6 +284,7 @@ public class BulkRequest extends ActionRequest implements Composite String id = null; String routing = defaultRouting; String parent = null; + String[] fields = defaultFields; String timestamp = null; Long ttl = null; String opType = null; @@ -329,6 +331,9 @@ public class BulkRequest extends ActionRequest implements Composite versionType = VersionType.fromString(parser.text()); } else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) { retryOnConflict = parser.intValue(); + } else if ("fields".equals(currentFieldName)) { + List values = parser.list(); + fields = values.toArray(new String[values.size()]); } else { throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]"); } @@ -372,6 +377,9 @@ public class BulkRequest extends ActionRequest implements Composite .routing(routing) .parent(parent) .source(data.slice(from, nextMarker - from)); + if (fields != null) { + updateRequest.fields(fields); + } IndexRequest upsertRequest = updateRequest.upsertRequest(); if (upsertRequest != null) { diff --git a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java index cc7fea15b10..a5874dc7846 100644 --- a/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java +++ b/core/src/main/java/org/elasticsearch/action/update/UpdateRequest.java @@ -44,6 +44,7 @@ import org.elasticsearch.script.ScriptService.ScriptType; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.elasticsearch.action.ValidateActions.addValidationError; @@ -667,6 +668,10 @@ public class UpdateRequest extends InstanceShardOperationRequest docAsUpsert(parser.booleanValue()); } else if ("detect_noop".equals(currentFieldName)) { detectNoop(parser.booleanValue()); + } else if ("fields".equals(currentFieldName)) { + List values = parser.list(); + String[] fields = values.toArray(new String[values.size()]); + fields(fields); } else { //here we don't have settings available, unable to throw deprecation exceptions scriptParameterParser.token(currentFieldName, token, parser, ParseFieldMatcher.EMPTY); diff --git a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java index 987178595a6..90184352714 100644 --- a/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java +++ b/core/src/main/java/org/elasticsearch/rest/action/bulk/RestBulkAction.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -75,6 +76,8 @@ public class RestBulkAction extends BaseRestHandler { String defaultIndex = request.param("index"); String defaultType = request.param("type"); String defaultRouting = request.param("routing"); + String fieldsParam = request.param("fields"); + String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null; String consistencyLevel = request.param("consistency"); if (consistencyLevel != null) { @@ -82,7 +85,7 @@ public class RestBulkAction extends BaseRestHandler { } bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT)); bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh())); - bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, null, allowExplicitIndex); + bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields, null, allowExplicitIndex); client.bulk(bulkRequest, new RestBuilderListener(channel) { @Override @@ -131,6 +134,11 @@ public class RestBulkAction extends BaseRestHandler { } else { builder.field(Fields.STATUS, shardInfo.status().getStatus()); } + if (updateResponse.getGetResult() != null) { + builder.startObject(Fields.GET); + updateResponse.getGetResult().toXContentEmbedded(builder, request); + builder.endObject(); + } } } builder.endObject(); @@ -155,6 +163,7 @@ public class RestBulkAction extends BaseRestHandler { static final XContentBuilderString TOOK = new XContentBuilderString("took"); static final XContentBuilderString _VERSION = new XContentBuilderString("_version"); static final XContentBuilderString FOUND = new XContentBuilderString("found"); + static final XContentBuilderString GET = new XContentBuilderString("get"); } } diff --git a/docs/reference/docs/bulk.asciidoc b/docs/reference/docs/bulk.asciidoc index 65552cc4278..603c5745632 100644 --- a/docs/reference/docs/bulk.asciidoc +++ b/docs/reference/docs/bulk.asciidoc @@ -180,7 +180,7 @@ times an update should be retried in the case of a version conflict. The `update` action payload, supports the following options: `doc` (partial document), `upsert`, `doc_as_upsert`, `script`, `params` (for -script), `lang` (for script). See update documentation for details on +script), `lang` (for script) and `fields`. See update documentation for details on the options. Curl example with update actions: [source,js] @@ -191,6 +191,10 @@ the options. Curl example with update actions: { "script" : { "inline": "ctx._source.counter += param1", "lang" : "js", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}} { "update" : {"_id" : "2", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} } { "doc" : {"field" : "value"}, "doc_as_upsert" : true } +{ "update" : {"_id" : "3", "_type" : "type1", "_index" : "index1", "fields" : ["_source"]} } +{ "doc" : {"field" : "value"} } +{ "update" : {"_id" : "4", "_type" : "type1", "_index" : "index1"} } +{ "doc" : {"field" : "value"}, "fields": ["_source"]} -------------------------------------------------- [float] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json index 560f23d3da5..b8eeec62039 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/bulk.json @@ -36,6 +36,10 @@ "type": { "type" : "string", "description" : "Default document type for items which don't provide one" + }, + "fields": { + "type": "string", + "description" : "Default comma-separated list of fields to return in the response for updates" } } }, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/40_fields.yaml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/40_fields.yaml new file mode 100644 index 00000000000..3aa9d522633 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/40_fields.yaml @@ -0,0 +1,49 @@ +--- +"Fields": + - do: + index: + refresh: true + index: test_index + type: test_type + id: test_id_1 + body: { "foo": "bar" } + + - do: + index: + refresh: true + index: test_index + type: test_type + id: test_id_2 + body: { "foo": "qux" } + + - do: + index: + refresh: true + index: test_index + type: test_type + id: test_id_3 + body: { "foo": "corge" } + + + - do: + bulk: + refresh: true + body: | + { "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_1", "fields": ["_source"] } } + { "doc": { "foo": "baz" } } + { "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_2" } } + { "fields": ["_source"], "doc": { "foo": "quux" } } + + - match: { items.0.update.get._source.foo: baz } + - match: { items.1.update.get._source.foo: quux } + + - do: + bulk: + index: test_index + type: test_type + fields: _source + body: | + { "update": { "_id": "test_id_3" } } + { "doc": { "foo": "garply" } } + + - match: { items.0.update.get._source.foo: garply }