Rework java update-by-query docs
This commit is contained in:
parent
83d7f199c7
commit
ccab85835a
|
@ -12,38 +12,11 @@ mapping change.
|
|||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
updateByQuery.source("cool_things")
|
||||
.filter(termQuery("level", "awesome"))
|
||||
.script(new Script("ctx._source.awesome = \"absolutely\""));
|
||||
updateByQuery.source("source_index").abortOnVersionConflict(false);
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
|
||||
The `updatebyQuery` API returns a JSON object similar to the following example:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took" : 147,
|
||||
"timed_out": false,
|
||||
"updated": 120,
|
||||
"deleted": 0,
|
||||
"batches": 1,
|
||||
"version_conflicts": 0,
|
||||
"noops": 0,
|
||||
"retries": {
|
||||
"bulk": 0,
|
||||
"search": 0
|
||||
},
|
||||
"throttled_millis": 0,
|
||||
"requests_per_second": "unlimited",
|
||||
"throttled_until_millis": 0,
|
||||
"total": 120,
|
||||
"failures" : [ ]
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE[s/"took" : 147/"took" : "$body.took"/]
|
||||
|
||||
Calls to the `updateByQuery` API start by getting a snapshot of the index, indexing
|
||||
any documents found using the `internal` versioning.
|
||||
|
||||
|
@ -54,79 +27,78 @@ When the versions match, `updateByQuery` updates the document
|
|||
and increments the version number.
|
||||
|
||||
All update and query failures cause `updateByQuery` to abort. These failures are
|
||||
listed in the `failures` section of the JSON response object. Any successful updates
|
||||
remain and are not rolled back. While the first failure causes the abort, the JSON
|
||||
response object contains all of the failures generated by the failed bulk request.
|
||||
available from the `BulkIndexByScrollResponse#getIndexingFailures` method. Any
|
||||
successful updates remain and are not rolled back. While the first failure
|
||||
causes the abort, the response contains all of the failures generated by the
|
||||
failed bulk request.
|
||||
|
||||
To prevent version conflicts from causing `updateByQuery` to abort,
|
||||
set `conflicts=proceed` on the URL or `"conflicts": "proceed"`
|
||||
in the request body. The first example does this because it is trying to
|
||||
pick up an online mapping change and a version conflict means that the
|
||||
conflicting document was updated between the start of the `updateByQuery`
|
||||
To prevent version conflicts from causing `updateByQuery` to abort, set
|
||||
`abortOnVersionConflict(false)`. The first example does this because it is
|
||||
trying to pick up an online mapping change and a version conflict means that
|
||||
the conflicting document was updated between the start of the `updateByQuery`
|
||||
and the time when it attempted to update the document. This is fine because
|
||||
that update will have picked up the online mapping update.
|
||||
|
||||
Back to the API format, you can limit `updateByQuery` to a single type. This
|
||||
will only update `tweet` documents from the `twitter` index:
|
||||
|
||||
// provide API Example
|
||||
Back to the API, `UpdateByQueryRequestBuilder` supports filtering the documents
|
||||
that are updated, limiting the total number updated, and updating documents
|
||||
with a script:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
updateByQuery.source("source_index")
|
||||
.filter(termQuery("level", "awesome"))
|
||||
.size(1000)
|
||||
.script(new Script("ctx._source.awesome = 'absolutely'", ScriptType.INLINE, "painless", emptyMap()));
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
You can also limit `updateByQuery` using the
|
||||
<<query-dsl,Query DSL>>. This example updates all documents from the
|
||||
`twitter` index for the user `kimchy`:
|
||||
|
||||
|
||||
// provide API Example
|
||||
`UpdateByQueryRequestBuilder` also allows you direct access to the query used
|
||||
to select the documents which you can use to change the default scroll size or
|
||||
otherwise modify the request for matching documents.
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
{
|
||||
"query": { <1>
|
||||
"term": {
|
||||
"user": "kimchy"
|
||||
}
|
||||
}
|
||||
}
|
||||
updateByQuery.source("source_index")
|
||||
.source().setSize(500);
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
<1> The query must be passed as a value to the `query` key, in the same
|
||||
way as the <<search-search,Search API>>. You can also use the `q`
|
||||
parameter in the same way as the search api.
|
||||
|
||||
So far we've only been updating documents without changing their source. That
|
||||
is genuinely useful for things like
|
||||
<<picking-up-a-new-property,picking up new properties>> but it's only half the
|
||||
fun. `updateByQuery` supports a `script` object to update the document. This
|
||||
will increment the `likes` field on all of kimchy's tweets:
|
||||
|
||||
// provide API Example
|
||||
You can also combine `size` with sorting to limit the documents updated:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
{
|
||||
"script": {
|
||||
"inline": "ctx._source.likes++"
|
||||
},
|
||||
"query": {
|
||||
"term": {
|
||||
"user": "kimchy"
|
||||
}
|
||||
}
|
||||
}
|
||||
updateByQuery.source("source_index").size(100)
|
||||
.source().addSort("cat", SortOrder.DESC);
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
|
||||
In addition to changing the `_source` of the document (see above) the script
|
||||
can change the update action similarly to the Update API:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
updateByQuery.source("source_index")
|
||||
.script(new Script(
|
||||
"if (ctx._source.awesome == 'absolutely) {"
|
||||
+ " ctx.op='noop'
|
||||
+ "} else if (ctx._source.awesome == 'lame') {"
|
||||
+ " ctx.op='delete'"
|
||||
+ "} else {"
|
||||
+ "ctx._source.awesome = 'absolutely'}", ScriptType.INLINE, "painless", emptyMap()));
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
Just as in <<docs-update,Update API>> you can set `ctx.op` to change the
|
||||
operation that is executed:
|
||||
|
@ -141,15 +113,12 @@ changes. That will cause `updateByQuery` to omit that document from its updates.
|
|||
`delete`::
|
||||
|
||||
Set `ctx.op = "delete"` if your script decides that the document must be
|
||||
deleted. The deletion will be reported in the `deleted` counter in the
|
||||
deleted. The deletion will be reported in the `deleted` counter in the
|
||||
<<docs-update-by-query-response-body, response body>>.
|
||||
|
||||
Setting `ctx.op` to anything else is an error. Setting any
|
||||
other field in `ctx` is an error.
|
||||
|
||||
Note that we stopped specifying `conflicts=proceed`. In this case we want a
|
||||
version conflict to abort the process so we can handle the failure.
|
||||
|
||||
This API doesn't allow you to move the documents it touches, just modify their
|
||||
source. This is intentional! We've made no provisions for removing the document
|
||||
from its original location.
|
||||
|
@ -157,144 +126,38 @@ from its original location.
|
|||
It's also possible to do this whole thing on multiple indexes and multiple
|
||||
types at once, just like the search API:
|
||||
|
||||
// provide API Example
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[s/^/PUT twitter\nPUT blog\nGET _cluster\/health?wait_for_status=yellow\n/]
|
||||
|
||||
If you provide `routing` then the routing is copied to the scroll query,
|
||||
limiting the process to the shards that match that routing value:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
updateByQuery.source().setRouting("cat");
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
By default `updateByQuery` uses scroll batches of 1000. You can change the
|
||||
batch size with the `scroll_size` URL parameter:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
`updateByQuery` can also use the <<ingest>> feature by
|
||||
specifying a `pipeline` like this:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
|
||||
|
||||
{
|
||||
"description" : "sets foo",
|
||||
"processors" : [ {
|
||||
"set" : {
|
||||
"field": "foo",
|
||||
"value": "bar"
|
||||
}
|
||||
} ]
|
||||
}
|
||||
POST twitter/_update_by_query?pipeline=set-foo
|
||||
updateByQuery.setPipeline("hurray");
|
||||
|
||||
BulkIndexByScrollResponse response = updateByQuery.get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[setup:twitter]
|
||||
|
||||
[float]
|
||||
=== URL Parameters
|
||||
|
||||
In addition to the standard parameters like `pretty`, the Update By Query API
|
||||
also supports `refresh`, `wait_for_completion`, `consistency`, and `timeout`.
|
||||
|
||||
Sending the `refresh` will update all shards in the index being updated when
|
||||
the request completes. This is different than the Index API's `refresh`
|
||||
parameter which causes just the shard that received the new data to be indexed.
|
||||
|
||||
If the request contains `wait_for_completion=false` then Elasticsearch will
|
||||
perform some preflight checks, launch the request, and then return a `task`
|
||||
which can be used with <<docs-update-by-query-task-api,Tasks APIs>>
|
||||
to cancel or get the status of the task. Elasticsearch will also create a
|
||||
record of this task as a document at `.tasks/task/${taskId}`. This is yours
|
||||
to keep or remove as you see fit. When you are done with it, delete it so
|
||||
Elasticsearch can reclaim the space it uses.
|
||||
|
||||
`consistency` controls how many copies of a shard must respond to each write
|
||||
request. `timeout` controls how long each write request waits for unavailable
|
||||
shards to become available. Both work exactly how they work in the
|
||||
<<docs-bulk,Bulk API>>.
|
||||
|
||||
`requests_per_second` can be set to any decimal number (`1.4`, `6`, `1000`, etc)
|
||||
and throttles the number of requests per second that the update by query issues.
|
||||
The throttling is done waiting between bulk batches so that it can manipulate
|
||||
the scroll timeout. The wait time is the difference between the time it took the
|
||||
batch to complete and the time `requests_per_second * requests_in_the_batch`.
|
||||
Since the batch isn't broken into multiple bulk requests large batch sizes will
|
||||
cause Elasticsearch to create many requests and then wait for a while before
|
||||
starting the next set. This is "bursty" instead of "smooth". The default is
|
||||
`unlimited` which is also the only non-number value that it accepts.
|
||||
|
||||
[float]
|
||||
[[docs-update-by-query-response-body]]
|
||||
=== Response body
|
||||
|
||||
The JSON response looks like this:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took" : 639,
|
||||
"updated": 0,
|
||||
"batches": 1,
|
||||
"version_conflicts": 2,
|
||||
"retries": {
|
||||
"bulk": 0,
|
||||
"search": 0
|
||||
}
|
||||
"throttled_millis": 0,
|
||||
"failures" : [ ]
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
`took`::
|
||||
|
||||
The number of milliseconds from start to end of the whole operation.
|
||||
|
||||
`updated`::
|
||||
|
||||
The number of documents that were successfully updated.
|
||||
|
||||
`batches`::
|
||||
|
||||
The number of scroll responses pulled back by the the update by query.
|
||||
|
||||
`version_conflicts`::
|
||||
|
||||
The number of version conflicts that the update by query hit.
|
||||
|
||||
`retries`::
|
||||
|
||||
The number of retries attempted by update-by-query. `bulk` is the number of bulk
|
||||
actions retried and `search` is the number of search actions retried.
|
||||
|
||||
`throttled_millis`::
|
||||
|
||||
Number of milliseconds the request slept to conform to `requests_per_second`.
|
||||
|
||||
`failures`::
|
||||
|
||||
Array of all indexing failures. If this is non-empty then the request aborted
|
||||
because of those failures. See `conflicts` for how to prevent version conflicts
|
||||
from aborting the operation.
|
||||
|
||||
|
||||
[float]
|
||||
[[docs-update-by-query-task-api]]
|
||||
|
@ -303,79 +166,26 @@ from aborting the operation.
|
|||
You can fetch the status of all running update-by-query requests with the
|
||||
<<tasks,Task API>>:
|
||||
|
||||
// provide API Example
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
|
||||
.setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
|
||||
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
The responses looks like:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"nodes" : {
|
||||
"r1A2WoRbTwKZ516z6NEs5A" : {
|
||||
"name" : "Tyrannus",
|
||||
"transport_address" : "127.0.0.1:9300",
|
||||
"host" : "127.0.0.1",
|
||||
"ip" : "127.0.0.1:9300",
|
||||
"attributes" : {
|
||||
"testattr" : "test",
|
||||
"portsfile" : "true"
|
||||
},
|
||||
"tasks" : {
|
||||
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
|
||||
"node" : "r1A2WoRbTwKZ516z6NEs5A",
|
||||
"id" : 36619,
|
||||
"type" : "transport",
|
||||
"action" : "indices:data/write/update/byquery",
|
||||
"status" : { <1>
|
||||
"total" : 6154,
|
||||
"updated" : 3500,
|
||||
"created" : 0,
|
||||
"deleted" : 0,
|
||||
"batches" : 4,
|
||||
"version_conflicts" : 0,
|
||||
"noops" : 0,
|
||||
"retries": {
|
||||
"bulk": 0,
|
||||
"search": 0
|
||||
}
|
||||
"throttled_millis": 0
|
||||
},
|
||||
"description" : ""
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for (TaskInfo info: tasksList.getTasks()) {
|
||||
TaskId taskId = info.getTaskId();
|
||||
BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
|
||||
// do stuff
|
||||
}
|
||||
|
||||
--------------------------------------------------
|
||||
|
||||
<1> this object contains the actual status. It is just like the response json
|
||||
with the important addition of the `total` field. `total` is the total number
|
||||
of operations that the reindex expects to perform. You can estimate the
|
||||
progress by adding the `updated`, `created`, and `deleted` fields. The request
|
||||
will finish when their sum is equal to the `total` field.
|
||||
|
||||
With the task id you can look up the task directly:
|
||||
With the `TaskId` shown above you can look up the task directly:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[catch:missing]
|
||||
|
||||
The advantage of this API is that it integrates with `wait_for_completion=false`
|
||||
to transparently return the status of completed tasks. If the task is completed
|
||||
and `wait_for_completion=false` was set on it them it'll come back with a
|
||||
`results` or an `error` field. The cost of this feature is the document that
|
||||
`wait_for_completion=false` creates at `.tasks/task/${taskId}`. It is up to
|
||||
you to delete that document.
|
||||
|
||||
[float]
|
||||
[[docs-update-by-query-cancel-task-api]]
|
||||
|
@ -383,14 +193,15 @@ you to delete that document.
|
|||
|
||||
Any Update By Query can be canceled using the <<tasks,Task Cancel API>>:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
// Cancel all update-by-query requests
|
||||
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks()
|
||||
// Cancel a specific update-by-query request
|
||||
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks()
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
The `task_id` can be found using the tasks API above.
|
||||
The `taskId` can be found using the list tasks API above.
|
||||
|
||||
Cancelation should happen quickly but might take a few seconds. The task status
|
||||
API above will continue to list the task until it is wakes to cancel itself.
|
||||
|
@ -403,120 +214,15 @@ API above will continue to list the task until it is wakes to cancel itself.
|
|||
The value of `requests_per_second` can be changed on a running update by query
|
||||
using the `_rethrottle` API:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
RethrottleAction.INSTANCE.newRequestBuilder(client).setTaskId(taskId).setRequestsPerSecond(2.0f).get();
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
The `task_id` can be found using the tasks API above.
|
||||
The `taskId` can be found using the tasks API above.
|
||||
|
||||
Just like when setting it on the `updateByQuery` API `requests_per_second`
|
||||
can be either `unlimited` to disable throttling or any decimal number like `1.7`
|
||||
or `12` to throttle to that level. Rethrottling that speeds up the query takes
|
||||
can be either `Float.POSITIVE_INFINITY` to disable throttling or any positive
|
||||
float to throttle to that level. Rethrottling that speeds up the query takes
|
||||
effect immediately but rethrotting that slows down the query will take effect
|
||||
on after completing the current batch. This prevents scroll timeouts.
|
||||
|
||||
|
||||
[float]
|
||||
[[picking-up-a-new-property]]
|
||||
=== Pick up a new property
|
||||
|
||||
Say you created an index without dynamic mapping, filled it with data, and then
|
||||
added a mapping value to pick up more fields from the data:
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
PUT test
|
||||
{
|
||||
"mappings": {
|
||||
"test": {
|
||||
"dynamic": false, <1>
|
||||
"properties": {
|
||||
"text": {"type": "text"}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
POST test/test?refresh
|
||||
{
|
||||
"text": "words words",
|
||||
"flag": "bar"
|
||||
}
|
||||
POST test/test?refresh
|
||||
{
|
||||
"text": "words words",
|
||||
"flag": "foo"
|
||||
}
|
||||
PUT test/_mapping/test <2>
|
||||
{
|
||||
"properties": {
|
||||
"text": {"type": "text"},
|
||||
"flag": {"type": "text", "analyzer": "keyword"}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
<1> This means that new fields won't be indexed, just stored in `_source`.
|
||||
|
||||
<2> This updates the mapping to add the new `flag` field. To pick up the new
|
||||
field you have to reindex all documents with it.
|
||||
|
||||
Searching for the data won't find anything:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
{
|
||||
"query": {
|
||||
"match": {
|
||||
"flag": "foo"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"hits" : {
|
||||
"total" : 0
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
|
||||
But you can issue an `updateByQuery` request to pick up the new mapping:
|
||||
|
||||
// provide API Example
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
|
||||
{
|
||||
"query": {
|
||||
"match": {
|
||||
"flag": "foo"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
// TEST[continued]
|
||||
|
||||
[source,java]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"hits" : {
|
||||
"total" : 1
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// TESTRESPONSE
|
||||
|
||||
You can do the exact same thing when adding a field to a multifield.
|
||||
|
|
Loading…
Reference in New Issue