Add support for retrieving fields in bulk updates

This commit adds support to retrieve fields when using the bulk update API. This functionality was previously available for the update API
but not for the bulk update API.

Closes #11527
This commit is contained in:
Jason Tedor 2015-07-08 00:58:12 -04:00
parent df41d0c3ba
commit b61709c716
7 changed files with 85 additions and 6 deletions

View File

@ -285,7 +285,7 @@ public class BulkProcessor implements Closeable {
}
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable Object payload) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, payload, true);
bulkRequest.add(data, defaultIndex, defaultType, null, null, payload, true);
executeIfNeeded();
return this;
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
@ -239,17 +240,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType) throws Exception {
return add(data, defaultIndex, defaultType, null, null, true);
return add(data, defaultIndex, defaultType, null, null, null, true);
}
/**
* Adds a framed data in binary format
*/
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, boolean allowExplicitIndex) throws Exception {
return add(data, defaultIndex, defaultType, null, null, allowExplicitIndex);
return add(data, defaultIndex, defaultType, null, null, null, allowExplicitIndex);
}
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception {
XContent xContent = XContentFactory.xContent(data);
int line = 0;
int from = 0;
@ -283,6 +284,7 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
String id = null;
String routing = defaultRouting;
String parent = null;
String[] fields = defaultFields;
String timestamp = null;
Long ttl = null;
String opType = null;
@ -329,6 +331,9 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
versionType = VersionType.fromString(parser.text());
} else if ("_retry_on_conflict".equals(currentFieldName) || "_retryOnConflict".equals(currentFieldName)) {
retryOnConflict = parser.intValue();
} else if ("fields".equals(currentFieldName)) {
List<Object> values = parser.list();
fields = values.toArray(new String[values.size()]);
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]");
}
@ -372,6 +377,9 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
.routing(routing)
.parent(parent)
.source(data.slice(from, nextMarker - from));
if (fields != null) {
updateRequest.fields(fields);
}
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.script.ScriptService.ScriptType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
@ -667,6 +668,10 @@ public class UpdateRequest extends InstanceShardOperationRequest<UpdateRequest>
docAsUpsert(parser.booleanValue());
} else if ("detect_noop".equals(currentFieldName)) {
detectNoop(parser.booleanValue());
} else if ("fields".equals(currentFieldName)) {
List<Object> values = parser.list();
String[] fields = values.toArray(new String[values.size()]);
fields(fields);
} else {
//here we don't have settings available, unable to throw deprecation exceptions
scriptParameterParser.token(currentFieldName, token, parser, ParseFieldMatcher.EMPTY);

View File

@ -31,6 +31,7 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -75,6 +76,8 @@ public class RestBulkAction extends BaseRestHandler {
String defaultIndex = request.param("index");
String defaultType = request.param("type");
String defaultRouting = request.param("routing");
String fieldsParam = request.param("fields");
String[] defaultFields = fieldsParam != null ? Strings.commaDelimitedListToStringArray(fieldsParam) : null;
String consistencyLevel = request.param("consistency");
if (consistencyLevel != null) {
@ -82,7 +85,7 @@ public class RestBulkAction extends BaseRestHandler {
}
bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
bulkRequest.refresh(request.paramAsBoolean("refresh", bulkRequest.refresh()));
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, null, allowExplicitIndex);
bulkRequest.add(request.content(), defaultIndex, defaultType, defaultRouting, defaultFields, null, allowExplicitIndex);
client.bulk(bulkRequest, new RestBuilderListener<BulkResponse>(channel) {
@Override
@ -131,6 +134,11 @@ public class RestBulkAction extends BaseRestHandler {
} else {
builder.field(Fields.STATUS, shardInfo.status().getStatus());
}
if (updateResponse.getGetResult() != null) {
builder.startObject(Fields.GET);
updateResponse.getGetResult().toXContentEmbedded(builder, request);
builder.endObject();
}
}
}
builder.endObject();
@ -155,6 +163,7 @@ public class RestBulkAction extends BaseRestHandler {
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString _VERSION = new XContentBuilderString("_version");
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString GET = new XContentBuilderString("get");
}
}

View File

@ -180,7 +180,7 @@ times an update should be retried in the case of a version conflict.
The `update` action payload, supports the following options: `doc`
(partial document), `upsert`, `doc_as_upsert`, `script`, `params` (for
script), `lang` (for script). See update documentation for details on
script), `lang` (for script) and `fields`. See update documentation for details on
the options. Curl example with update actions:
[source,js]
@ -191,6 +191,10 @@ the options. Curl example with update actions:
{ "script" : { "inline": "ctx._source.counter += param1", "lang" : "js", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "type1", "_index" : "index1", "_retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "type1", "_index" : "index1", "fields" : ["_source"]} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "type1", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "fields": ["_source"]}
--------------------------------------------------
[float]

View File

@ -36,6 +36,10 @@
"type": {
"type" : "string",
"description" : "Default document type for items which don't provide one"
},
"fields": {
"type": "string",
"description" : "Default comma-separated list of fields to return in the response for updates"
}
}
},

View File

@ -0,0 +1,49 @@
---
"Fields":
- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_1
body: { "foo": "bar" }
- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_2
body: { "foo": "qux" }
- do:
index:
refresh: true
index: test_index
type: test_type
id: test_id_3
body: { "foo": "corge" }
- do:
bulk:
refresh: true
body: |
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_1", "fields": ["_source"] } }
{ "doc": { "foo": "baz" } }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "test_id_2" } }
{ "fields": ["_source"], "doc": { "foo": "quux" } }
- match: { items.0.update.get._source.foo: baz }
- match: { items.1.update.get._source.foo: quux }
- do:
bulk:
index: test_index
type: test_type
fields: _source
body: |
{ "update": { "_id": "test_id_3" } }
{ "doc": { "foo": "garply" } }
- match: { items.0.update.get._source.foo: garply }