Port Delete By Query to Reindex infrastructure

closes #16883
This commit is contained in:
Tanguy Leroux 2016-05-13 15:27:02 +02:00
parent 3cf4214255
commit a01ecb20ea
42 changed files with 3007 additions and 440 deletions

View File

@ -27,6 +27,8 @@ include::docs/get.asciidoc[]
include::docs/delete.asciidoc[]
include::docs/delete-by-query.asciidoc[]
include::docs/update.asciidoc[]
include::docs/update-by-query.asciidoc[]

View File

@ -0,0 +1,318 @@
[[docs-delete-by-query]]
== Delete By Query API
experimental[The delete-by-query API is new and should still be considered experimental. The API may change in ways that are not backwards compatible]
The simplest usage of `_delete_by_query` just performs a deletion on every
document that match a query. Here is the API:
[source,js]
--------------------------------------------------
POST twitter/_delete_by_query
{
"query": { <1>
"match": {
"message": "some message"
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:big_twitter]
<1> The query must be passed as a value to the `query` key, in the same
way as the <<search-search,Search API>>. You can also use the `q`
parameter in the same way as the search api.
That will return something like this:
[source,js]
--------------------------------------------------
{
"took" : 147,
"timed_out": false,
"deleted": 119,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"retries": {
"bulk": 0,
"search": 0
},
"throttled_millis": 0,
"requests_per_second": "unlimited",
"throttled_until_millis": 0,
"total": 119,
"failures" : [ ]
}
--------------------------------------------------
// TESTRESPONSE[s/"took" : 147/"took" : "$body.took"/]
`_delete_by_query` gets a snapshot of the index when it starts and deletes what
it finds using `internal` versioning. That means that you'll get a version
conflict if the document changes between the time when the snapshot was taken
and when the delete request is processed. When the versions match the document
is deleted.
During the `_delete_by_query` execution, multiple search requests are sequentially
executed in order to find all the matching documents to delete. Every time a batch
of documents is found, a corresponding bulk request is executed to delete all
these documents. In case a search or bulk request got rejected, `_delete_by_query`
relies on a default policy to retry rejected requests (up to 10 times, with
exponential back off). Reaching the maximum retries limit causes the `_delete_by_query`
to abort and all failures are returned in the `failures` of the response.
The deletions that have been performed still stick. In other words, the process
is not rolled back, only aborted. While the first failure causes the abort all
failures that are returned by the failing bulk request are returned in the `failures`
element so it's possible for there to be quite a few.
If you'd like to count version conflicts rather than cause them to abort then
set `conflicts=proceed` on the url or `"conflicts": "proceed"` in the request body.
Back to the API format, you can limit `_delete_by_query` to a single type. This
will only delete `tweet` documents from the `twitter` index:
[source,js]
--------------------------------------------------
POST twitter/tweet/_delete_by_query?conflicts=proceed
{
"query": {
"match_all": {}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:twitter]
It's also possible to delete documents of multiple indexes and multiple
types at once, just like the search API:
[source,js]
--------------------------------------------------
POST twitter,blog/tweet,post/_delete_by_query
{
"query": {
"match_all": {}
}
}
--------------------------------------------------
// CONSOLE
// TEST[s/^/PUT twitter\nPUT blog\nGET _cluster\/health?wait_for_status=yellow\n/]
If you provide `routing` then the routing is copied to the scroll query,
limiting the process to the shards that match that routing value:
[source,js]
--------------------------------------------------
POST twitter/_delete_by_query?routing=1
{
"query": {
"range" : {
"age" : {
"gte" : 10
}
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:twitter]
By default `_delete_by_query` uses scroll batches of 1000. You can change the
batch size with the `scroll_size` URL parameter:
[source,js]
--------------------------------------------------
POST twitter/_delete_by_query?scroll_size=5000
{
"query": {
"term": {
"user": "kimchy"
}
}
}
--------------------------------------------------
// CONSOLE
// TEST[setup:twitter]
[float]
=== URL Parameters
In addition to the standard parameters like `pretty`, the Delete By Query API
also supports `refresh`, `wait_for_completion`, `consistency`, and `timeout`.
Sending the `refresh` will refresh all shards involved in the delete by query
once the request completes. This is different than the Delete API's `refresh`
parameter which causes just the shard that received the delete request
to be refreshed.
If the request contains `wait_for_completion=false` then Elasticsearch will
perform some preflight checks, launch the request, and then return a `task`
which can be used with <<docs-delete-by-query-task-api,Tasks APIs>> to cancel
or get the status of the task. For now, once the request is finished the task
is gone and the only place to look for the ultimate result of the task is in
the Elasticsearch log file. This will be fixed soon.
`consistency` controls how many copies of a shard must respond to each write
request. `timeout` controls how long each write request waits for unavailable
shards to become available. Both work exactly how they work in the
<<docs-bulk,Bulk API>>.
`requests_per_second` can be set to any decimal number (`1.4`, `6`, `1000`, etc)
and throttles the number of requests per second that the delete by query issues.
The throttling is done waiting between bulk batches so that it can manipulate
the scroll timeout. The wait time is the difference between the time it took the
batch to complete and the time `requests_per_second * requests_in_the_batch`.
Since the batch isn't broken into multiple bulk requests large batch sizes will
cause Elasticsearch to create many requests and then wait for a while before
starting the next set. This is "bursty" instead of "smooth". The default is
`unlimited` which is also the only non-number value that it accepts.
[float]
=== Response body
The JSON response looks like this:
[source,js]
--------------------------------------------------
{
"took" : 639,
"deleted": 0,
"batches": 1,
"version_conflicts": 2,
"retries": 0,
"throttled_millis": 0,
"failures" : [ ]
}
--------------------------------------------------
`took`::
The number of milliseconds from start to end of the whole operation.
`deleted`::
The number of documents that were successfully deleted.
`batches`::
The number of scroll responses pulled back by the the delete by query.
`version_conflicts`::
The number of version conflicts that the delete by query hit.
`retries`::
The number of retries that the delete by query did in response to a full queue.
`throttled_millis`::
Number of milliseconds the request slept to conform to `requests_per_second`.
`failures`::
Array of all indexing failures. If this is non-empty then the request aborted
because of those failures. See `conflicts` for how to prevent version conflicts
from aborting the operation.
[float]
[[docs-delete-by-query-task-api]]
=== Works with the Task API
While Delete By Query is running you can fetch their status using the
<<tasks,Task API>>:
[source,js]
--------------------------------------------------
GET _tasks?detailed=true&action=*/delete/byquery
--------------------------------------------------
// CONSOLE
The responses looks like:
[source,js]
--------------------------------------------------
{
"nodes" : {
"r1A2WoRbTwKZ516z6NEs5A" : {
"name" : "Tyrannus",
"transport_address" : "127.0.0.1:9300",
"host" : "127.0.0.1",
"ip" : "127.0.0.1:9300",
"attributes" : {
"testattr" : "test",
"portsfile" : "true"
},
"tasks" : {
"r1A2WoRbTwKZ516z6NEs5A:36619" : {
"node" : "r1A2WoRbTwKZ516z6NEs5A",
"id" : 36619,
"type" : "transport",
"action" : "indices:data/write/delete/byquery",
"status" : { <1>
"total" : 6154,
"updated" : 0,
"created" : 0,
"deleted" : 3500,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"throttled_millis": 0
},
"description" : ""
}
}
}
}
}
--------------------------------------------------
<1> this object contains the actual status. It is just like the response json
with the important addition of the `total` field. `total` is the total number
of operations that the reindex expects to perform. You can estimate the
progress by adding the `updated`, `created`, and `deleted` fields. The request
will finish when their sum is equal to the `total` field.
[float]
[[docs-delete-by-query-cancel-task-api]]
=== Works with the Cancel Task API
Any Delete By Query can be canceled using the <<tasks,Task Cancel API>>:
[source,js]
--------------------------------------------------
POST _tasks/taskid:1/_cancel
--------------------------------------------------
// CONSOLE
The `task_id` can be found using the tasks API above.
Cancelation should happen quickly but might take a few seconds. The task status
API above will continue to list the task until it is wakes to cancel itself.
[float]
[[docs-delete-by-query-rethrottle]]
=== Rethrottling
The value of `requests_per_second` can be changed on a running delete by query
using the `_rethrottle` API:
[source,js]
--------------------------------------------------
POST _delete_by_query/taskid:1/_rethrottle?requests_per_second=unlimited
--------------------------------------------------
// CONSOLE
The `task_id` can be found using the tasks API above.
Just like when setting it on the `_delete_by_query` API `requests_per_second`
can be either `unlimited` to disable throttling or any decimal number like `1.7`
or `12` to throttle to that level. Rethrottling that speeds up the query takes
effect immediately but rethrotting that slows down the query will take effect
on after completing the current batch. This prevents scroll timeouts.

View File

@ -61,7 +61,7 @@ and the time when it attempted to update the document. This is fine because
that update will have picked up the online mapping update.
Back to the API format, you can limit `_update_by_query` to a single type. This
will only update `tweet`s from the `twitter` index:
will only update `tweet` documents from the `twitter` index:
[source,js]
--------------------------------------------------
@ -119,7 +119,7 @@ Just as in <<docs-update,Update API>> you can set `ctx.op = "noop"` if
your script decides that it doesn't have to make any changes. That will cause
`_update_by_query` to omit that document from its updates. Setting `ctx.op` to
anything else is an error. If you want to delete by a query you can use the
{plugins}/plugins-delete-by-query.html[Delete by Query plugin] instead. Setting any
<<docs-delete-by-query,Delete By Query API>> instead. Setting any
other field in `ctx` is an error.
Note that we stopped specifying `conflicts=proceed`. In this case we want a

View File

@ -75,7 +75,7 @@ import static org.elasticsearch.search.sort.SortBuilders.fieldSort;
* Abstract base for scrolling across a search and executing bulk actions on all results. All package private methods are package private so
* their tests can use them. Most methods run in the listener thread pool because the are meant to be fast and don't expect to block.
*/
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>, Response> {
public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBulkByScrollRequest<Request>> {
/**
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
* requests of this mainRequest.
@ -92,12 +92,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final ParentTaskAssigningClient client;
private final ThreadPool threadPool;
private final SearchRequest firstSearchRequest;
private final ActionListener<Response> listener;
private final ActionListener<BulkIndexByScrollResponse> listener;
private final BackoffPolicy backoffPolicy;
private final Retry bulkRetry;
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest, ActionListener<Response> listener) {
ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<BulkIndexByScrollResponse> listener) {
this.task = task;
this.logger = logger;
this.client = client;
@ -111,8 +112,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
protected abstract BulkRequest buildBulk(Iterable<SearchHit> docs);
protected abstract Response buildResponse(TimeValue took, List<Failure> indexingFailures, List<ShardSearchFailure> searchFailures,
boolean timedOut);
/**
* Build the response for reindex actions.
*/
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<BulkItemResponse.Failure> indexingFailures,
List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
}
/**
* Start the action by firing the initial search request.

View File

@ -20,15 +20,16 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.internal.IdFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
@ -40,6 +41,7 @@ import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
@ -47,9 +49,9 @@ import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import static java.util.Collections.emptyMap;
@ -57,91 +59,106 @@ import static java.util.Collections.emptyMap;
* Abstract base for scrolling across a search and executing bulk indexes on all
* results.
*/
public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkIndexByScrollRequest<Request>>
extends AbstractAsyncBulkByScrollAction<Request, BulkIndexByScrollResponse> {
public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkByScrollRequest<Request>>
extends AbstractAsyncBulkByScrollAction<Request> {
private final ScriptService scriptService;
private final CompiledScript script;
protected final ScriptService scriptService;
protected final ClusterState clusterState;
public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, ClusterState state,
ParentTaskAssigningClient client, ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<BulkIndexByScrollResponse> listener) {
/**
* This BiFunction is used to apply various changes depending of the Reindex action and the search hit,
* from copying search hit metadata (parent, routing, etc) to potentially transforming the
* {@link RequestWrapper} completely.
*/
private final BiFunction<RequestWrapper<?>, SearchHit, RequestWrapper<?>> scriptApplier;
public AbstractAsyncBulkIndexByScrollAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, SearchRequest firstSearchRequest,
ActionListener<BulkIndexByScrollResponse> listener,
ScriptService scriptService, ClusterState clusterState) {
super(task, logger, client, threadPool, mainRequest, firstSearchRequest, listener);
this.scriptService = scriptService;
if (mainRequest.getScript() == null) {
script = null;
} else {
script = scriptService.compile(mainRequest.getScript(), ScriptContext.Standard.UPDATE, emptyMap(), state);
}
}
@Override
protected BulkIndexByScrollResponse buildResponse(TimeValue took, List<BulkItemResponse.Failure> indexingFailures,
List<ShardSearchFailure> searchFailures, boolean timedOut) {
return new BulkIndexByScrollResponse(took, task.getStatus(), indexingFailures, searchFailures, timedOut);
this.clusterState = clusterState;
this.scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
}
/**
* Build the IndexRequest for a single search hit. This shouldn't handle
* metadata or the script. That will be handled by copyMetadata and
* applyScript functions that can be overridden.
* Build the {@link BiFunction} to apply to all {@link RequestWrapper}.
*/
protected abstract IndexRequest buildIndexRequest(SearchHit doc);
protected BiFunction<RequestWrapper<?>, SearchHit, RequestWrapper<?>> buildScriptApplier() {
// The default script applier executes a no-op
return (request, searchHit) -> request;
}
@Override
protected BulkRequest buildBulk(Iterable<SearchHit> docs) {
BulkRequest bulkRequest = new BulkRequest();
ExecutableScript executableScript = null;
Map<String, Object> scriptCtx = null;
for (SearchHit doc : docs) {
if (doc.hasSource()) {
/*
* Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to
* change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source.
* Thus the error message assumes that it wasn't stored.
*/
throw new IllegalArgumentException("[" + doc.index() + "][" + doc.type() + "][" + doc.id() + "] didn't store _source");
}
IndexRequest index = buildIndexRequest(doc);
copyMetadata(index, doc);
if (script != null) {
if (executableScript == null) {
executableScript = scriptService.executable(script, mainRequest.getScript().getParams());
scriptCtx = new HashMap<>();
}
if (false == applyScript(index, doc, executableScript, scriptCtx)) {
continue;
if (accept(doc)) {
RequestWrapper<?> request = scriptApplier.apply(copyMetadata(buildRequest(doc), doc), doc);
if (request != null) {
bulkRequest.add(request.self());
}
}
bulkRequest.add(index);
}
return bulkRequest;
}
/**
* Copies the metadata from a hit to the index request.
* Used to accept or ignore a search hit. Ignored search hits will be excluded
* from the bulk request. It is also where we fail on invalid search hits, like
* when the document has no source but it's required.
*/
protected void copyMetadata(IndexRequest index, SearchHit doc) {
index.parent(fieldValue(doc, ParentFieldMapper.NAME));
copyRouting(index, doc);
// Comes back as a Long but needs to be a string
Long timestamp = fieldValue(doc, TimestampFieldMapper.NAME);
if (timestamp != null) {
index.timestamp(timestamp.toString());
}
Long ttl = fieldValue(doc, TTLFieldMapper.NAME);
if (ttl != null) {
index.ttl(ttl);
protected boolean accept(SearchHit doc) {
if (doc.hasSource()) {
/*
* Either the document didn't store _source or we didn't fetch it for some reason. Since we don't allow the user to
* change the "fields" part of the search request it is unlikely that we got here because we didn't fetch _source.
* Thus the error message assumes that it wasn't stored.
*/
throw new IllegalArgumentException("[" + doc.index() + "][" + doc.type() + "][" + doc.id() + "] didn't store _source");
}
return true;
}
/**
* Part of copyMetadata but called out individual for easy overwriting.
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
* metadata or scripting. That will be handled by copyMetadata and
* apply functions that can be overridden.
*/
protected void copyRouting(IndexRequest index, SearchHit doc) {
index.routing(fieldValue(doc, RoutingFieldMapper.NAME));
protected abstract RequestWrapper<?> buildRequest(SearchHit doc);
/**
* Copies the metadata from a hit to the request.
*/
protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, SearchHit doc) {
copyParent(request, fieldValue(doc, ParentFieldMapper.NAME));
copyRouting(request, fieldValue(doc, RoutingFieldMapper.NAME));
// Comes back as a Long but needs to be a string
Long timestamp = fieldValue(doc, TimestampFieldMapper.NAME);
if (timestamp != null) {
request.setTimestamp(timestamp.toString());
}
Long ttl = fieldValue(doc, TTLFieldMapper.NAME);
if (ttl != null) {
request.setTtl(ttl);
}
return request;
}
/**
* Copy the parent from a search hit to the request.
*/
protected void copyParent(RequestWrapper<?> request, String parent) {
request.setParent(parent);
}
/**
* Copy the routing from a search hit to the request.
*/
protected void copyRouting(RequestWrapper<?> request, String routing) {
request.setRouting(routing);
}
protected <T> T fieldValue(SearchHit doc, String fieldName) {
@ -150,106 +167,327 @@ public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends Abstr
}
/**
* Apply a script to the request.
*
* @return is this request still ok to apply (true) or is it a noop (false)
* Wrapper for the {@link ActionRequest} that are used in this action class.
*/
@SuppressWarnings("unchecked")
protected boolean applyScript(IndexRequest index, SearchHit doc, ExecutableScript script, final Map<String, Object> ctx) {
if (script == null) {
return true;
}
ctx.put(IndexFieldMapper.NAME, doc.index());
ctx.put(TypeFieldMapper.NAME, doc.type());
ctx.put(IdFieldMapper.NAME, doc.id());
Long oldVersion = doc.getVersion();
ctx.put(VersionFieldMapper.NAME, oldVersion);
String oldParent = fieldValue(doc, ParentFieldMapper.NAME);
ctx.put(ParentFieldMapper.NAME, oldParent);
String oldRouting = fieldValue(doc, RoutingFieldMapper.NAME);
ctx.put(RoutingFieldMapper.NAME, oldRouting);
Long oldTimestamp = fieldValue(doc, TimestampFieldMapper.NAME);
ctx.put(TimestampFieldMapper.NAME, oldTimestamp);
Long oldTTL = fieldValue(doc, TTLFieldMapper.NAME);
ctx.put(TTLFieldMapper.NAME, oldTTL);
ctx.put(SourceFieldMapper.NAME, index.sourceAsMap());
ctx.put("op", "update");
script.setNextVar("ctx", ctx);
script.run();
Map<String, Object> resultCtx = (Map<String, Object>) script.unwrap(ctx);
String newOp = (String) resultCtx.remove("op");
if (newOp == null) {
throw new IllegalArgumentException("Script cleared op!");
}
if ("noop".equals(newOp)) {
task.countNoop();
return false;
}
if (false == "update".equals(newOp)) {
throw new IllegalArgumentException("Invalid op [" + newOp + ']');
}
interface RequestWrapper<Self extends ActionRequest<Self>> {
/*
* It'd be lovely to only set the source if we know its been modified
* but it isn't worth keeping two copies of it around just to check!
*/
index.source((Map<String, Object>) resultCtx.remove(SourceFieldMapper.NAME));
void setIndex(String index);
Object newValue = ctx.remove(IndexFieldMapper.NAME);
if (false == doc.index().equals(newValue)) {
scriptChangedIndex(index, newValue);
}
newValue = ctx.remove(TypeFieldMapper.NAME);
if (false == doc.type().equals(newValue)) {
scriptChangedType(index, newValue);
}
newValue = ctx.remove(IdFieldMapper.NAME);
if (false == doc.id().equals(newValue)) {
scriptChangedId(index, newValue);
}
newValue = ctx.remove(VersionFieldMapper.NAME);
if (false == Objects.equals(oldVersion, newValue)) {
scriptChangedVersion(index, newValue);
}
newValue = ctx.remove(ParentFieldMapper.NAME);
if (false == Objects.equals(oldParent, newValue)) {
scriptChangedParent(index, newValue);
}
/*
* Its important that routing comes after parent in case you want to
* change them both.
*/
newValue = ctx.remove(RoutingFieldMapper.NAME);
if (false == Objects.equals(oldRouting, newValue)) {
scriptChangedRouting(index, newValue);
}
newValue = ctx.remove(TimestampFieldMapper.NAME);
if (false == Objects.equals(oldTimestamp, newValue)) {
scriptChangedTimestamp(index, newValue);
}
newValue = ctx.remove(TTLFieldMapper.NAME);
if (false == Objects.equals(oldTTL, newValue)) {
scriptChangedTTL(index, newValue);
}
if (false == ctx.isEmpty()) {
throw new IllegalArgumentException("Invalid fields added to ctx [" + String.join(",", ctx.keySet()) + ']');
}
return true;
void setType(String type);
void setId(String id);
void setVersion(long version);
void setVersionType(VersionType versionType);
void setParent(String parent);
void setRouting(String routing);
void setTimestamp(String timestamp);
void setTtl(Long ttl);
void setSource(Map<String, Object> source);
Map<String, Object> getSource();
Self self();
}
protected abstract void scriptChangedIndex(IndexRequest index, Object to);
/**
* {@link RequestWrapper} for {@link IndexRequest}
*/
public static class IndexRequestWrapper implements RequestWrapper<IndexRequest> {
protected abstract void scriptChangedType(IndexRequest index, Object to);
private final IndexRequest request;
protected abstract void scriptChangedId(IndexRequest index, Object to);
IndexRequestWrapper(IndexRequest request) {
this.request = Objects.requireNonNull(request, "Wrapped IndexRequest can not be null");
}
protected abstract void scriptChangedVersion(IndexRequest index, Object to);
@Override
public void setIndex(String index) {
request.index(index);
}
protected abstract void scriptChangedRouting(IndexRequest index, Object to);
@Override
public void setType(String type) {
request.type(type);
}
protected abstract void scriptChangedParent(IndexRequest index, Object to);
@Override
public void setId(String id) {
request.id(id);
}
protected abstract void scriptChangedTimestamp(IndexRequest index, Object to);
@Override
public void setVersion(long version) {
request.version(version);
}
protected abstract void scriptChangedTTL(IndexRequest index, Object to);
@Override
public void setVersionType(VersionType versionType) {
request.versionType(versionType);
}
@Override
public void setParent(String parent) {
request.parent(parent);
}
@Override
public void setRouting(String routing) {
request.routing(routing);
}
@Override
public void setTimestamp(String timestamp) {
request.timestamp(timestamp);
}
@Override
public void setTtl(Long ttl) {
if (ttl == null) {
request.ttl((TimeValue) null);
} else {
request.ttl(ttl);
}
}
@Override
public Map<String, Object> getSource() {
return request.sourceAsMap();
}
@Override
public void setSource(Map<String, Object> source) {
request.source(source);
}
@Override
public IndexRequest self() {
return request;
}
}
/**
* Wraps a {@link IndexRequest} in a {@link RequestWrapper}
*/
static RequestWrapper<IndexRequest> wrap(IndexRequest request) {
return new IndexRequestWrapper(request);
}
/**
* {@link RequestWrapper} for {@link DeleteRequest}
*/
public static class DeleteRequestWrapper implements RequestWrapper<DeleteRequest> {
private final DeleteRequest request;
DeleteRequestWrapper(DeleteRequest request) {
this.request = Objects.requireNonNull(request, "Wrapped DeleteRequest can not be null");
}
@Override
public void setIndex(String index) {
request.index(index);
}
@Override
public void setType(String type) {
request.type(type);
}
@Override
public void setId(String id) {
request.id(id);
}
@Override
public void setVersion(long version) {
request.version(version);
}
@Override
public void setVersionType(VersionType versionType) {
request.versionType(versionType);
}
@Override
public void setParent(String parent) {
request.parent(parent);
}
@Override
public void setRouting(String routing) {
request.routing(routing);
}
@Override
public void setTimestamp(String timestamp) {
throw new UnsupportedOperationException("unable to set [timestamp] on action request [" + request.getClass() + "]");
}
@Override
public void setTtl(Long ttl) {
throw new UnsupportedOperationException("unable to set [ttl] on action request [" + request.getClass() + "]");
}
@Override
public Map<String, Object> getSource() {
throw new UnsupportedOperationException("unable to get source from action request [" + request.getClass() + "]");
}
@Override
public void setSource(Map<String, Object> source) {
throw new UnsupportedOperationException("unable to set [source] on action request [" + request.getClass() + "]");
}
@Override
public DeleteRequest self() {
return request;
}
}
/**
* Wraps a {@link DeleteRequest} in a {@link RequestWrapper}
*/
static RequestWrapper<DeleteRequest> wrap(DeleteRequest request) {
return new DeleteRequestWrapper(request);
}
/**
* Apply a {@link Script} to a {@link RequestWrapper}
*/
public abstract class ScriptApplier implements BiFunction<RequestWrapper<?>, SearchHit, RequestWrapper<?>> {
private final BulkByScrollTask task;
private final ScriptService scriptService;
private final ClusterState state;
private final Script script;
private final Map<String, Object> params;
private ExecutableScript executable;
private Map<String, Object> context;
public ScriptApplier(BulkByScrollTask task, ScriptService scriptService, Script script, ClusterState state,
Map<String, Object> params) {
this.task = task;
this.scriptService = scriptService;
this.script = script;
this.state = state;
this.params = params;
}
@Override
@SuppressWarnings("unchecked")
public RequestWrapper<?> apply(RequestWrapper<?> request, SearchHit doc) {
if (script == null) {
return request;
}
if (executable == null) {
CompiledScript compiled = scriptService.compile(script, ScriptContext.Standard.UPDATE, emptyMap(), state);
executable = scriptService.executable(compiled, params);
}
if (context == null) {
context = new HashMap<>();
}
context.put(IndexFieldMapper.NAME, doc.index());
context.put(TypeFieldMapper.NAME, doc.type());
context.put(IdFieldMapper.NAME, doc.id());
Long oldVersion = doc.getVersion();
context.put(VersionFieldMapper.NAME, oldVersion);
String oldParent = fieldValue(doc, ParentFieldMapper.NAME);
context.put(ParentFieldMapper.NAME, oldParent);
String oldRouting = fieldValue(doc, RoutingFieldMapper.NAME);
context.put(RoutingFieldMapper.NAME, oldRouting);
Long oldTimestamp = fieldValue(doc, TimestampFieldMapper.NAME);
context.put(TimestampFieldMapper.NAME, oldTimestamp);
Long oldTTL = fieldValue(doc, TTLFieldMapper.NAME);
context.put(TTLFieldMapper.NAME, oldTTL);
context.put(SourceFieldMapper.NAME, request.getSource());
context.put("op", "update");
executable.setNextVar("ctx", context);
executable.run();
Map<String, Object> resultCtx = (Map<String, Object>) executable.unwrap(context);
String newOp = (String) resultCtx.remove("op");
if (newOp == null) {
throw new IllegalArgumentException("Script cleared op!");
}
if ("noop".equals(newOp)) {
task.countNoop();
return null;
}
if (false == "update".equals(newOp)) {
throw new IllegalArgumentException("Invalid op [" + newOp + ']');
}
/*
* It'd be lovely to only set the source if we know its been modified
* but it isn't worth keeping two copies of it around just to check!
*/
request.setSource((Map<String, Object>) resultCtx.remove(SourceFieldMapper.NAME));
Object newValue = context.remove(IndexFieldMapper.NAME);
if (false == doc.index().equals(newValue)) {
scriptChangedIndex(request, newValue);
}
newValue = context.remove(TypeFieldMapper.NAME);
if (false == doc.type().equals(newValue)) {
scriptChangedType(request, newValue);
}
newValue = context.remove(IdFieldMapper.NAME);
if (false == doc.id().equals(newValue)) {
scriptChangedId(request, newValue);
}
newValue = context.remove(VersionFieldMapper.NAME);
if (false == Objects.equals(oldVersion, newValue)) {
scriptChangedVersion(request, newValue);
}
newValue = context.remove(ParentFieldMapper.NAME);
if (false == Objects.equals(oldParent, newValue)) {
scriptChangedParent(request, newValue);
}
/*
* Its important that routing comes after parent in case you want to
* change them both.
*/
newValue = context.remove(RoutingFieldMapper.NAME);
if (false == Objects.equals(oldRouting, newValue)) {
scriptChangedRouting(request, newValue);
}
newValue = context.remove(TimestampFieldMapper.NAME);
if (false == Objects.equals(oldTimestamp, newValue)) {
scriptChangedTimestamp(request, newValue);
}
newValue = context.remove(TTLFieldMapper.NAME);
if (false == Objects.equals(oldTTL, newValue)) {
scriptChangedTTL(request, newValue);
}
if (false == context.isEmpty()) {
throw new IllegalArgumentException("Invalid fields added to context [" + String.join(",", context.keySet()) + ']');
}
return request;
}
protected abstract void scriptChangedIndex(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedType(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedId(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedVersion(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedRouting(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedParent(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedTimestamp(RequestWrapper<?> request, Object to);
protected abstract void scriptChangedTTL(RequestWrapper<?> request, Object to);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
@ -45,32 +46,6 @@ public abstract class AbstractBaseReindexRestHandler<
TA extends TransportAction<Request, BulkIndexByScrollResponse>
> extends BaseRestHandler {
/**
* @return requests_per_second from the request as a float if it was on the request, null otherwise
*/
public static Float parseRequestsPerSecond(RestRequest request) {
String requestsPerSecondString = request.param("requests_per_second");
if (requestsPerSecondString == null) {
return null;
}
if ("unlimited".equals(requestsPerSecondString)) {
return Float.POSITIVE_INFINITY;
}
float requestsPerSecond;
try {
requestsPerSecond = Float.parseFloat(requestsPerSecondString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.", e);
}
if (requestsPerSecond <= 0) {
// We validate here and in the setters because the setters use "Float.POSITIVE_INFINITY" instead of "unlimited"
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.");
}
return requestsPerSecond;
}
protected final IndicesQueriesRegistry indicesQueriesRegistry;
protected final AggregatorParsers aggParsers;
protected final Suggesters suggesters;
@ -88,41 +63,95 @@ public abstract class AbstractBaseReindexRestHandler<
this.action = action;
}
protected void execute(RestRequest request, Request internalRequest, RestChannel channel,
boolean includeCreated, boolean includeUpdated, boolean includeDeleted) throws IOException {
Float requestsPerSecond = parseRequestsPerSecond(request);
if (requestsPerSecond != null) {
internalRequest.setRequestsPerSecond(requestsPerSecond);
}
protected void handleRequest(RestRequest request, RestChannel channel,
boolean includeCreated, boolean includeUpdated, boolean includeDeleted) throws IOException {
// Build the internal request
Request internal = setCommonOptions(request, buildRequest(request));
// Executes the request and waits for completion
if (request.paramAsBoolean("wait_for_completion", true)) {
Map<String, String> params = new HashMap<>();
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(includeCreated));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(includeUpdated));
params.put(BulkByScrollTask.Status.INCLUDE_DELETED, Boolean.toString(includeDeleted));
action.execute(internalRequest, new BulkIndexByScrollResponseContentListener<>(channel, params));
action.execute(internal, new BulkIndexByScrollResponseContentListener<>(channel, params));
return;
}
/*
* Lets try and validate before forking so the user gets some error. The
* task can't totally validate until it starts but this is better than
* nothing.
*/
ActionRequestValidationException validationException = internalRequest.validate();
ActionRequestValidationException validationException = internal.validate();
if (validationException != null) {
channel.sendResponse(new BytesRestResponse(channel, validationException));
return;
}
Task task = action.execute(internalRequest, LoggingTaskListener.instance());
sendTask(channel, task);
sendTask(channel, action.execute(internal, LoggingTaskListener.instance()));
}
/**
* Build the Request based on the RestRequest.
*/
protected abstract Request buildRequest(RestRequest request) throws IOException;
/**
* Sets common options of {@link AbstractBulkByScrollRequest} requests.
*/
protected Request setCommonOptions(RestRequest restRequest, Request request) {
assert restRequest != null : "RestRequest should not be null";
assert request != null : "Request should not be null";
request.setRefresh(restRequest.paramAsBoolean("refresh", request.isRefresh()));
request.setTimeout(restRequest.paramAsTime("timeout", request.getTimeout()));
String consistency = restRequest.param("consistency");
if (consistency != null) {
request.setConsistency(WriteConsistencyLevel.fromString(consistency));
}
Float requestsPerSecond = parseRequestsPerSecond(restRequest);
if (requestsPerSecond != null) {
request.setRequestsPerSecond(requestsPerSecond);
}
return request;
}
private void sendTask(RestChannel channel, Task task) throws IOException {
XContentBuilder builder = channel.newBuilder();
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("task", clusterService.localNode().getId() + ":" + task.getId());
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
}
}
/**
* @return requests_per_second from the request as a float if it was on the request, null otherwise
*/
public static Float parseRequestsPerSecond(RestRequest request) {
String requestsPerSecondString = request.param("requests_per_second");
if (requestsPerSecondString == null) {
return null;
}
if ("unlimited".equals(requestsPerSecondString)) {
return Float.POSITIVE_INFINITY;
}
float requestsPerSecond;
try {
requestsPerSecond = Float.parseFloat(requestsPerSecondString);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.", e);
}
if (requestsPerSecond <= 0) {
// We validate here and in the setters because the setters use "Float.POSITIVE_INFINITY" instead of "unlimited"
throw new IllegalArgumentException(
"[requests_per_second] must be a float greater than 0. Use \"unlimited\" to disable throttling.");
}
return requestsPerSecond;
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import java.io.IOException;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
/**
* Rest handler for reindex actions that accepts a search request like Update-By-Query or Delete-By-Query
*/
public abstract class AbstractBulkByQueryRestHandler<
Request extends AbstractBulkByScrollRequest<Request>,
TA extends TransportAction<Request, BulkIndexByScrollResponse>> extends AbstractBaseReindexRestHandler<Request, TA> {
protected AbstractBulkByQueryRestHandler(Settings settings, Client client, IndicesQueriesRegistry indicesQueriesRegistry,
AggregatorParsers aggParsers, Suggesters suggesters, ClusterService clusterService,
TA action) {
super(settings, client, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
}
protected void parseInternalRequest(Request internal, RestRequest restRequest,
Map<String, Consumer<Object>> consumers) throws IOException {
assert internal != null : "Request should not be null";
assert restRequest != null : "RestRequest should not be null";
SearchRequest searchRequest = internal.getSearchRequest();
int scrollSize = searchRequest.source().size();
searchRequest.source().size(SIZE_ALL_MATCHES);
parseSearchRequest(searchRequest, restRequest, consumers);
internal.setSize(searchRequest.source().size());
searchRequest.source().size(restRequest.paramAsInt("scroll_size", scrollSize));
String conflicts = restRequest.param("conflicts");
if (conflicts != null) {
internal.setConflicts(conflicts);
}
// Let the requester set search timeout. It is probably only going to be useful for testing but who knows.
if (restRequest.hasParam("search_timeout")) {
searchRequest.source().timeout(restRequest.paramAsTime("search_timeout", null));
}
}
protected void parseSearchRequest(SearchRequest searchRequest, RestRequest restRequest,
Map<String, Consumer<Object>> consumers) throws IOException {
assert searchRequest != null : "SearchRequest should not be null";
assert restRequest != null : "RestRequest should not be null";
/*
* We can't send parseSearchRequest REST content that it doesn't support
* so we will have to remove the content that is valid in addition to
* what it supports from the content first. This is a temporary hack and
* should get better when SearchRequest has full ObjectParser support
* then we can delegate and stuff.
*/
BytesReference content = RestActions.hasBodyContent(restRequest) ? RestActions.getRestContent(restRequest) : null;
if ((content != null) && (consumers != null && consumers.size() > 0)) {
Tuple<XContentType, Map<String, Object>> body = XContentHelper.convertToMap(content, false);
boolean modified = false;
for (Map.Entry<String, Consumer<Object>> consumer : consumers.entrySet()) {
Object value = body.v2().remove(consumer.getKey());
if (value != null) {
consumer.getValue().accept(value);
modified = true;
}
}
if (modified) {
try (XContentBuilder builder = XContentFactory.contentBuilder(body.v1())) {
content = builder.map(body.v2()).bytes();
}
}
}
RestSearchAction.parseSearchRequest(searchRequest, indicesQueriesRegistry, restRequest, parseFieldMatcher, aggParsers,
suggesters, content);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class DeleteByQueryAction extends Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> {
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
public static final String NAME = "indices:data/write/delete/byquery";
private DeleteByQueryAction() {
super(NAME);
}
@Override
public DeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new DeleteByQueryRequestBuilder(client, this);
}
@Override
public BulkIndexByScrollResponse newResponse() {
return new BulkIndexByScrollResponse();
}
}

View File

@ -0,0 +1,79 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.search.SearchRequest;
import static org.elasticsearch.action.ValidateActions.addValidationError;
/**
* Creates a new {@link DeleteByQueryRequest} that uses scrolling and bulk requests to delete all documents matching
* the query. This can have performance as well as visibility implications.
*
* Delete-by-query now has the following semantics:
* <ul>
* <li>it's <tt>non-atomic</tt>, a delete-by-query may fail at any time while some documents matching the query have already been
* deleted</li>
* <li>it's <tt>syntactic sugar</tt>, a delete-by-query is equivalent to a scroll search and corresponding bulk-deletes by ID</li>
* <li>it's executed on a <tt>point-in-time</tt> snapshot, a delete-by-query will only delete the documents that are visible at the
* point in time the delete-by-query was started, equivalent to the scroll API</li>
* <li>it's <tt>consistent</tt>, a delete-by-query will yield consistent results across all replicas of a shard</li>
* <li>it's <tt>forward-compatible</tt>, a delete-by-query will only send IDs to the shards as deletes such that no queries are
* stored in the transaction logs that might not be supported in the future.</li>
* <li>it's results won't be visible until the index is refreshed.</li>
* </ul>
*/
public class DeleteByQueryRequest extends AbstractBulkByScrollRequest<DeleteByQueryRequest> {
public DeleteByQueryRequest() {
}
public DeleteByQueryRequest(SearchRequest search) {
super(search);
// Delete-By-Query does not require the source
search.source().fetchSource(false);
}
@Override
protected DeleteByQueryRequest self() {
return this;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException e = super.validate();
if (getSearchRequest().indices() == null || getSearchRequest().indices().length == 0) {
e = addValidationError("use _all if you really want to delete from all existing indexes", e);
}
if (getSearchRequest() == null || getSearchRequest().source() == null) {
e = addValidationError("source is missing", e);
}
return e;
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
b.append("delete-by-query ");
searchToString(b);
return b.toString();
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient;
public class DeleteByQueryRequestBuilder extends
AbstractBulkByScrollRequestBuilder<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> {
public DeleteByQueryRequestBuilder(ElasticsearchClient client,
Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> action) {
this(client, action, new SearchRequestBuilder(client, SearchAction.INSTANCE));
}
private DeleteByQueryRequestBuilder(ElasticsearchClient client,
Action<DeleteByQueryRequest, BulkIndexByScrollResponse, DeleteByQueryRequestBuilder> action,
SearchRequestBuilder search) {
super(client, action, search, new DeleteByQueryRequest(search.request()));
}
@Override
protected DeleteByQueryRequestBuilder self() {
return this;
}
@Override
public DeleteByQueryRequestBuilder abortOnVersionConflict(boolean abortOnVersionConflict) {
request.setAbortOnVersionConflict(abortOnVersionConflict);
return this;
}
}

View File

@ -39,12 +39,14 @@ public class ReindexPlugin extends Plugin {
public void onModule(ActionModule actionModule) {
actionModule.registerAction(ReindexAction.INSTANCE, TransportReindexAction.class);
actionModule.registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class);
actionModule.registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class);
actionModule.registerAction(RethrottleAction.INSTANCE, TransportRethrottleAction.class);
}
public void onModule(NetworkModule networkModule) {
networkModule.registerRestHandler(RestReindexAction.class);
networkModule.registerRestHandler(RestUpdateByQueryAction.class);
networkModule.registerRestHandler(RestDeleteByQueryAction.class);
networkModule.registerRestHandler(RestRethrottleAction.class);
networkModule.registerTaskStatus(BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new);
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestDeleteByQueryAction extends AbstractBulkByQueryRestHandler<DeleteByQueryRequest, TransportDeleteByQueryAction> {
@Inject
public RestDeleteByQueryAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, Suggesters suggesters,
ClusterService clusterService, TransportDeleteByQueryAction action) {
super(settings, client, indicesQueriesRegistry, aggParsers, suggesters, clusterService, action);
controller.registerHandler(POST, "/{index}/_delete_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_delete_by_query", this);
}
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
if (false == request.hasContent()) {
throw new ElasticsearchException("_delete_by_query requires a request body");
}
handleRequest(request, channel, false, false, true);
}
@Override
protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException {
/*
* Passing the search request through DeleteByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
* defaults. Then the parseInternalRequest can override them.
*/
DeleteByQueryRequest internal = new DeleteByQueryRequest(new SearchRequest());
Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
parseInternalRequest(internal, request, consumers);
return internal;
}
}

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.WriteConsistencyLevel;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.ParseFieldMatcherSupplier;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -39,7 +38,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
@ -53,13 +51,14 @@ import java.util.Map;
import static org.elasticsearch.common.unit.TimeValue.parseTimeValue;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
/**
* Expose IndexBySearchRequest over rest.
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, TransportReindexAction> {
private static final ObjectParser<ReindexRequest, ReindexParseContext> PARSER = new ObjectParser<>("reindex");
static {
ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> {
/*
@ -114,41 +113,18 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
@Override
public void handleRequest(RestRequest request, RestChannel channel, Client client) throws IOException {
if (false == request.hasContent()) {
badRequest(channel, "body required");
return;
throw new ElasticsearchException("_reindex requires a request body");
}
handleRequest(request, channel, true, true, false);
}
ReindexRequest internalRequest = new ReindexRequest(new SearchRequest(), new IndexRequest());
@Override
protected ReindexRequest buildRequest(RestRequest request) throws IOException {
ReindexRequest internal = new ReindexRequest(new SearchRequest(), new IndexRequest());
try (XContentParser xcontent = XContentFactory.xContent(request.content()).createParser(request.content())) {
PARSER.parse(xcontent, internalRequest, new ReindexParseContext(indicesQueriesRegistry, aggParsers,
suggesters, parseFieldMatcher));
} catch (ParsingException e) {
logger.warn("Bad request", e);
badRequest(channel, e.getDetailedMessage());
return;
}
parseCommon(internalRequest, request);
execute(request, internalRequest, channel, true, true, false);
}
private void badRequest(RestChannel channel, String message) {
try {
XContentBuilder builder = channel.newErrorBuilder();
channel.sendResponse(new BytesRestResponse(BAD_REQUEST, builder.startObject().field("error", message).endObject()));
} catch (IOException e) {
logger.warn("Failed to send response", e);
}
}
public static void parseCommon(AbstractBulkByScrollRequest<?> internalRequest, RestRequest request) {
internalRequest.setRefresh(request.paramAsBoolean("refresh", internalRequest.isRefresh()));
internalRequest.setTimeout(request.paramAsTime("timeout", internalRequest.getTimeout()));
String consistency = request.param("consistency");
if (consistency != null) {
internalRequest.setConsistency(WriteConsistencyLevel.fromString(consistency));
PARSER.parse(xcontent, internal, new ReindexParseContext(indicesQueriesRegistry, aggParsers, suggesters, parseFieldMatcher));
}
return internal;
}
/**

View File

@ -39,6 +39,7 @@ public class RestRethrottleAction extends BaseRestHandler {
super(settings, client);
this.action = action;
controller.registerHandler(POST, "/_update_by_query/{taskId}/_rethrottle", this);
controller.registerHandler(POST, "/_delete_by_query/{taskId}/_rethrottle", this);
controller.registerHandler(POST, "/_reindex/{taskId}/_rethrottle", this);
}

View File

@ -22,31 +22,24 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.suggest.Suggesters;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.index.reindex.RestReindexAction.parseCommon;
import static org.elasticsearch.rest.RestRequest.Method.POST;
public class RestUpdateByQueryAction extends AbstractBaseReindexRestHandler<UpdateByQueryRequest, TransportUpdateByQueryAction> {
public class RestUpdateByQueryAction extends AbstractBulkByQueryRestHandler<UpdateByQueryRequest, TransportUpdateByQueryAction> {
@Inject
public RestUpdateByQueryAction(Settings settings, RestController controller, Client client,
@ -59,60 +52,26 @@ public class RestUpdateByQueryAction extends AbstractBaseReindexRestHandler<Upda
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client) throws Exception {
handleRequest(request, channel, false, true, false);
}
@Override
@SuppressWarnings("unchecked")
protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException {
/*
* Passing the search request through UpdateByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
* defaults. Then the parse can override them.
*/
UpdateByQueryRequest internalRequest = new UpdateByQueryRequest(new SearchRequest());
int scrollSize = internalRequest.getSearchRequest().source().size();
internalRequest.getSearchRequest().source().size(SIZE_ALL_MATCHES);
/*
* We can't send parseSearchRequest REST content that it doesn't support
* so we will have to remove the content that is valid in addition to
* what it supports from the content first. This is a temporary hack and
* should get better when SearchRequest has full ObjectParser support
* then we can delegate and stuff.
*/
BytesReference bodyContent = null;
if (RestActions.hasBodyContent(request)) {
bodyContent = RestActions.getRestContent(request);
Tuple<XContentType, Map<String, Object>> body = XContentHelper.convertToMap(bodyContent, false);
boolean modified = false;
String conflicts = (String) body.v2().remove("conflicts");
if (conflicts != null) {
internalRequest.setConflicts(conflicts);
modified = true;
}
@SuppressWarnings("unchecked")
Map<String, Object> script = (Map<String, Object>) body.v2().remove("script");
if (script != null) {
internalRequest.setScript(Script.parse(script, false, parseFieldMatcher));
modified = true;
}
if (modified) {
XContentBuilder builder = XContentFactory.contentBuilder(body.v1());
builder.map(body.v2());
bodyContent = builder.bytes();
}
}
RestSearchAction.parseSearchRequest(internalRequest.getSearchRequest(), indicesQueriesRegistry, request,
parseFieldMatcher, aggParsers, suggesters, bodyContent);
UpdateByQueryRequest internal = new UpdateByQueryRequest(new SearchRequest());
String conflicts = request.param("conflicts");
if (conflicts != null) {
internalRequest.setConflicts(conflicts);
}
parseCommon(internalRequest, request);
Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("script", o -> internal.setScript(Script.parse((Map<String, Object>)o, false, parseFieldMatcher)));
internalRequest.setSize(internalRequest.getSearchRequest().source().size());
internalRequest.setPipeline(request.param("pipeline"));
internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize));
// Let the requester set search timeout. It is probably only going to be useful for testing but who knows.
if (request.hasParam("search_timeout")) {
internalRequest.getSearchRequest().source().timeout(request.paramAsTime("search_timeout", null));
}
parseInternalRequest(internal, request, consumers);
execute(request, internalRequest, channel, false, true, false);
internal.setPipeline(request.param("pipeline"));
return internal;
}
}

View File

@ -0,0 +1,109 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, BulkIndexByScrollResponse> {
private final Client client;
private final ScriptService scriptService;
private final ClusterService clusterService;
@Inject
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver resolver, Client client, TransportService transportService,
ScriptService scriptService, ClusterService clusterService) {
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, resolver, DeleteByQueryRequest::new);
this.client = client;
this.scriptService = scriptService;
this.clusterService = clusterService;
}
@Override
protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
ClusterState state = clusterService.state();
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
new AsyncDeleteBySearchAction((BulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService, state).start();
}
@Override
protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
throw new UnsupportedOperationException("task required");
}
/**
* Implementation of delete-by-query using scrolling and bulk.
*/
static class AsyncDeleteBySearchAction extends AbstractAsyncBulkIndexByScrollAction<DeleteByQueryRequest> {
public AsyncDeleteBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, ThreadPool threadPool,
DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
ScriptService scriptService, ClusterState clusterState) {
super(task, logger, client, threadPool, request, request.getSearchRequest(), listener, scriptService, clusterState);
}
@Override
protected boolean accept(SearchHit doc) {
// Delete-by-query does not require the source to delete a document
// and the default implementation checks for it
return true;
}
@Override
protected RequestWrapper<DeleteRequest> buildRequest(SearchHit doc) {
DeleteRequest delete = new DeleteRequest();
delete.index(doc.index());
delete.type(doc.type());
delete.id(doc.id());
delete.version(doc.version());
return wrap(delete);
}
/**
* Overrides the parent {@link AbstractAsyncBulkIndexByScrollAction#copyMetadata(RequestWrapper, SearchHit)}
* method that is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
* don't care for a deletion.
*/
@Override
protected RequestWrapper<?> copyMetadata(RequestWrapper<?> request, SearchHit doc) {
copyParent(request, fieldValue(doc, ParentFieldMapper.NAME));
copyRouting(request, fieldValue(doc, RoutingFieldMapper.NAME));
return request;
}
}
}

View File

@ -35,16 +35,18 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.VersionFieldMapper;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.index.VersionType.INTERNAL;
@ -72,7 +74,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
ClusterState state = clusterService.state();
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex, state);
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, state, threadPool, request, listener).start();
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService, state).start();
}
@Override
@ -87,7 +89,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* isn't available then. Package private for testing.
*/
static String validateAgainstAliases(SearchRequest source, IndexRequest destination,
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex, ClusterState clusterState) {
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
ClusterState clusterState) {
String target = destination.index();
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
/*
@ -97,7 +100,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
*/
target = indexNameExpressionResolver.concreteIndexNames(clusterState, destination)[0];
}
for (String sourceIndex: indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
if (sourceIndex.equals(target)) {
ActionRequestValidationException e = new ActionRequestValidationException();
e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
@ -114,14 +117,24 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* possible.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ParentTaskAssigningClient client, ClusterState state, ThreadPool threadPool, ReindexRequest request,
ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, scriptService, state, client, threadPool, request, request.getSearchRequest(), listener);
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, ThreadPool threadPool,
ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener,
ScriptService scriptService, ClusterState clusterState) {
super(task, logger, client, threadPool, request, request.getSearchRequest(), listener, scriptService, clusterState);
}
@Override
protected IndexRequest buildIndexRequest(SearchHit doc) {
protected BiFunction<RequestWrapper<?>, SearchHit, RequestWrapper<?>> buildScriptApplier() {
Script script = mainRequest.getScript();
if (script != null) {
return new ReindexScriptApplier(task, scriptService, script, clusterState, script.getParams());
}
return super.buildScriptApplier();
}
@Override
protected RequestWrapper<IndexRequest> buildRequest(SearchHit doc) {
IndexRequest index = new IndexRequest();
// Copy the index from the request so we always write where it asked to write
@ -161,109 +174,120 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
index.setPipeline(mainRequest.getDestination().getPipeline());
// OpType is synthesized from version so it is handled when we copy version above.
return index;
return wrap(index);
}
/**
* Override the simple copy behavior to allow more fine grained control.
*/
@Override
protected void copyRouting(IndexRequest index, SearchHit doc) {
protected void copyRouting(RequestWrapper<?> request, String routing) {
String routingSpec = mainRequest.getDestination().routing();
if (routingSpec == null) {
super.copyRouting(index, doc);
super.copyRouting(request, routing);
return;
}
if (routingSpec.startsWith("=")) {
index.routing(mainRequest.getDestination().routing().substring(1));
super.copyRouting(request, mainRequest.getDestination().routing().substring(1));
return;
}
switch (routingSpec) {
case "keep":
super.copyRouting(index, doc);
super.copyRouting(request, routing);
break;
case "discard":
index.routing(null);
super.copyRouting(request, null);
break;
default:
throw new IllegalArgumentException("Unsupported routing command");
}
}
/*
* Methods below here handle script updating the index request. They try
* to be pretty liberal with regards to types because script are often
* dynamically typed.
*/
@Override
protected void scriptChangedIndex(IndexRequest index, Object to) {
requireNonNull(to, "Can't reindex without a destination index!");
index.index(to.toString());
}
class ReindexScriptApplier extends ScriptApplier {
@Override
protected void scriptChangedType(IndexRequest index, Object to) {
requireNonNull(to, "Can't reindex without a destination type!");
index.type(to.toString());
}
@Override
protected void scriptChangedId(IndexRequest index, Object to) {
index.id(Objects.toString(to, null));
}
@Override
protected void scriptChangedVersion(IndexRequest index, Object to) {
if (to == null) {
index.version(Versions.MATCH_ANY).versionType(INTERNAL);
return;
ReindexScriptApplier(BulkByScrollTask task, ScriptService scriptService, Script script, ClusterState state,
Map<String, Object> params) {
super(task, scriptService, script, state, params);
}
index.version(asLong(to, VersionFieldMapper.NAME));
}
@Override
protected void scriptChangedParent(IndexRequest index, Object to) {
// Have to override routing with parent just in case its changed
String routing = Objects.toString(to, null);
index.parent(routing).routing(routing);
}
@Override
protected void scriptChangedRouting(IndexRequest index, Object to) {
index.routing(Objects.toString(to, null));
}
@Override
protected void scriptChangedTimestamp(IndexRequest index, Object to) {
index.timestamp(Objects.toString(to, null));
}
@Override
protected void scriptChangedTTL(IndexRequest index, Object to) {
if (to == null) {
index.ttl((TimeValue) null);
return;
}
index.ttl(asLong(to, TTLFieldMapper.NAME));
}
private long asLong(Object from, String name) {
/*
* Stuffing a number into the map will have converted it to
* some Number.
* Methods below here handle script updating the index request. They try
* to be pretty liberal with regards to types because script are often
* dynamically typed.
*/
Number fromNumber;
try {
fromNumber = (Number) from;
} catch (ClassCastException e) {
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
@Override
protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
requireNonNull(to, "Can't reindex without a destination index!");
request.setIndex(to.toString());
}
long l = fromNumber.longValue();
// Check that we didn't round when we fetched the value.
if (fromNumber.doubleValue() != l) {
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
@Override
protected void scriptChangedType(RequestWrapper<?> request, Object to) {
requireNonNull(to, "Can't reindex without a destination type!");
request.setType(to.toString());
}
@Override
protected void scriptChangedId(RequestWrapper<?> request, Object to) {
request.setId(Objects.toString(to, null));
}
@Override
protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
if (to == null) {
request.setVersion(Versions.MATCH_ANY);
request.setVersionType(INTERNAL);
} else {
request.setVersion(asLong(to, VersionFieldMapper.NAME));
}
}
@Override
protected void scriptChangedParent(RequestWrapper<?> request, Object to) {
// Have to override routing with parent just in case its changed
String routing = Objects.toString(to, null);
request.setParent(routing);
request.setRouting(routing);
}
@Override
protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
request.setRouting(Objects.toString(to, null));
}
@Override
protected void scriptChangedTimestamp(RequestWrapper<?> request, Object to) {
request.setTimestamp(Objects.toString(to, null));
}
@Override
protected void scriptChangedTTL(RequestWrapper<?> request, Object to) {
if (to == null) {
request.setTtl(null);
} else {
request.setTtl(asLong(to, TTLFieldMapper.NAME));
}
}
private long asLong(Object from, String name) {
/*
* Stuffing a number into the map will have converted it to
* some Number.
* */
Number fromNumber;
try {
fromNumber = (Number) from;
} catch (ClassCastException e) {
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]", e);
}
long l = fromNumber.longValue();
// Check that we didn't round when we fetched the value.
if (fromNumber.doubleValue() != l) {
throw new IllegalArgumentException(name + " may only be set to an int or a long but was [" + from + "]");
}
return l;
}
return l;
}
}
}

View File

@ -39,12 +39,16 @@ import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.TTLFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.Map;
import java.util.function.BiFunction;
public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
private final Client client;
private final ScriptService scriptService;
@ -65,8 +69,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
ClusterState state = clusterService.state();
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, state, request, listener)
.start();
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService, state).start();
}
@Override
@ -78,14 +81,24 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
* Simple implementation of update-by-query using scrolling and bulk.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService,
ParentTaskAssigningClient client, ThreadPool threadPool, ClusterState clusterState, UpdateByQueryRequest request,
ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, scriptService, clusterState, client, threadPool, request, request.getSearchRequest(), listener);
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ParentTaskAssigningClient client, ThreadPool threadPool,
UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
ScriptService scriptService, ClusterState clusterState) {
super(task, logger, client, threadPool, request, request.getSearchRequest(), listener, scriptService, clusterState);
}
@Override
protected IndexRequest buildIndexRequest(SearchHit doc) {
protected BiFunction<RequestWrapper<?>, SearchHit, RequestWrapper<?>> buildScriptApplier() {
Script script = mainRequest.getScript();
if (script != null) {
return new UpdateByQueryScriptApplier(task, scriptService, script, clusterState, script.getParams());
}
return super.buildScriptApplier();
}
@Override
protected RequestWrapper<IndexRequest> buildRequest(SearchHit doc) {
IndexRequest index = new IndexRequest();
index.index(doc.index());
index.type(doc.type());
@ -94,47 +107,55 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
index.versionType(VersionType.INTERNAL);
index.version(doc.version());
index.setPipeline(mainRequest.getPipeline());
return index;
return wrap(index);
}
@Override
protected void scriptChangedIndex(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + IndexFieldMapper.NAME + "] not allowed");
}
class UpdateByQueryScriptApplier extends ScriptApplier {
@Override
protected void scriptChangedType(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + TypeFieldMapper.NAME + "] not allowed");
}
UpdateByQueryScriptApplier(BulkByScrollTask task, ScriptService scriptService, Script script, ClusterState state,
Map<String, Object> params) {
super(task, scriptService, script, state, params);
}
@Override
protected void scriptChangedId(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + IdFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedIndex(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + IndexFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedVersion(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [_version] not allowed");
}
@Override
protected void scriptChangedType(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + TypeFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedRouting(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + RoutingFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedId(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + IdFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedParent(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + ParentFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedVersion(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [_version] not allowed");
}
@Override
protected void scriptChangedTimestamp(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + TimestampFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedRouting(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + RoutingFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedTTL(IndexRequest index, Object to) {
throw new IllegalArgumentException("Modifying [" + TTLFieldMapper.NAME + "] not allowed");
@Override
protected void scriptChangedParent(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + ParentFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedTimestamp(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + TimestampFieldMapper.NAME + "] not allowed");
}
@Override
protected void scriptChangedTTL(RequestWrapper<?> request, Object to) {
throw new IllegalArgumentException("Modifying [" + TTLFieldMapper.NAME + "] not allowed");
}
}
}
}

View File

@ -22,10 +22,15 @@ package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.Index;
import org.elasticsearch.script.CompiledScript;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.junit.Before;
import org.mockito.Matchers;
import java.util.HashMap;
import java.util.Map;
@ -33,18 +38,35 @@ import java.util.function.Consumer;
import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
private static final Script EMPTY_SCRIPT = new Script("");
protected ScriptService scriptService;
@Before
public void setupScriptService() {
scriptService = mock(ScriptService.class);
}
protected IndexRequest applyScript(Consumer<Map<String, Object>> scriptBody) {
IndexRequest index = new IndexRequest("index", "type", "1").source(singletonMap("foo", "bar"));
Map<String, SearchHitField> fields = new HashMap<>();
InternalSearchHit doc = new InternalSearchHit(0, "id", new Text("type"), fields);
doc.shardTarget(new SearchShardTarget("nodeid", new Index("index", "uuid"), 1));
ExecutableScript script = new SimpleExecutableScript(scriptBody);
action().applyScript(index, doc, script, new HashMap<>());
ExecutableScript executableScript = new SimpleExecutableScript(scriptBody);
when(scriptService.executable(any(CompiledScript.class), Matchers.<Map<String, Object>>any()))
.thenReturn(executableScript);
AbstractAsyncBulkIndexByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc);
return index;
}
@ -53,7 +75,7 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
applyScript((Map<String, Object> ctx) -> ctx.put("junk", "junk"));
fail("Expected error");
} catch (IllegalArgumentException e) {
assertThat(e.getMessage(), equalTo("Invalid fields added to ctx [junk]"));
assertThat(e.getMessage(), equalTo("Invalid fields added to context [junk]"));
}
}
@ -65,4 +87,6 @@ public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
});
assertEquals("cat", index.sourceAsMap().get("bar"));
}
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action(ScriptService scriptService, Request request);
}

View File

@ -46,8 +46,6 @@ public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
threadPool.shutdown();
}
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action();
protected abstract Request request();
protected PlainActionFuture<Response> listener() {

View File

@ -49,13 +49,15 @@ public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<
public void testTimestampIsCopied() {
IndexRequest index = new IndexRequest();
action().copyMetadata(index, doc(TimestampFieldMapper.NAME, 10L));
action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(TimestampFieldMapper.NAME, 10L));
assertEquals("10", index.timestamp());
}
public void testTTL() throws Exception {
IndexRequest index = new IndexRequest();
action().copyMetadata(index, doc(TTLFieldMapper.NAME, 10L));
action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(TTLFieldMapper.NAME, 10L));
assertEquals(timeValueMillis(10), index.ttl());
}
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action();
}

View File

@ -665,7 +665,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
}
private class DummyAbstractAsyncBulkByScrollAction
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, BulkIndexByScrollResponse> {
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest> {
public DummyAbstractAsyncBulkByScrollAction() {
super(testTask, logger, new ParentTaskAssigningClient(client, localNode, testTask), threadPool, testRequest, firstSearchRequest,
listener);

View File

@ -30,6 +30,8 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
private Matcher<Long> createdMatcher = equalTo(0L);
private Matcher<Long> updatedMatcher = equalTo(0L);
private Matcher<Long> deletedMatcher = equalTo(0L);
/**
* Matches for number of batches. Optional.
*/
@ -56,6 +58,15 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
return updated(equalTo(updated));
}
public BulkIndexByScrollResponseMatcher deleted(Matcher<Long> deletedMatcher) {
this.deletedMatcher = deletedMatcher;
return this;
}
public BulkIndexByScrollResponseMatcher deleted(long deleted) {
return deleted(equalTo(deleted));
}
/**
* Set the matches for the number of batches. Defaults to matching any
* integer because we usually don't care about how many batches the job
@ -110,6 +121,7 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
protected boolean matchesSafely(BulkIndexByScrollResponse item) {
return updatedMatcher.matches(item.getUpdated()) &&
createdMatcher.matches(item.getCreated()) &&
deletedMatcher.matches(item.getDeleted()) &&
(batchesMatcher == null || batchesMatcher.matches(item.getBatches())) &&
versionConflictsMatcher.matches(item.getVersionConflicts()) &&
failuresMatcher.matches(item.getIndexingFailures().size()) &&
@ -120,6 +132,7 @@ public class BulkIndexByScrollResponseMatcher extends TypeSafeMatcher<BulkIndexB
public void describeTo(Description description) {
description.appendText("updated matches ").appendDescriptionOf(updatedMatcher);
description.appendText(" and created matches ").appendDescriptionOf(createdMatcher);
description.appendText(" and deleted matches ").appendDescriptionOf(deletedMatcher);
if (batchesMatcher != null) {
description.appendText(" and batches matches ").appendDescriptionOf(batchesMatcher);
}

View File

@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.sort.SortOrder;
import java.util.ArrayList;
import java.util.List;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
public class DeleteByQueryBasicTests extends ReindexTestCase {
public void testBasics() throws Exception {
indexRandom(true,
client().prepareIndex("test", "test", "1").setSource("foo", "a"),
client().prepareIndex("test", "test", "2").setSource("foo", "a"),
client().prepareIndex("test", "test", "3").setSource("foo", "b"),
client().prepareIndex("test", "test", "4").setSource("foo", "c"),
client().prepareIndex("test", "test", "5").setSource("foo", "d"),
client().prepareIndex("test", "test", "6").setSource("foo", "e"),
client().prepareIndex("test", "test", "7").setSource("foo", "f")
);
assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 7);
// Deletes two docs that matches "foo:a"
assertThat(deleteByQuery().source("test").filter(termQuery("foo", "a")).refresh(true).get(), matcher().deleted(2));
assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 5);
// Deletes the two first docs with limit by size
DeleteByQueryRequestBuilder request = deleteByQuery().source("test").size(2).refresh(true);
request.source().addSort("foo.keyword", SortOrder.ASC);
assertThat(request.get(), matcher().deleted(2));
assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 3);
// Deletes but match no docs
assertThat(deleteByQuery().source("test").filter(termQuery("foo", "no_match")).refresh(true).get(), matcher().deleted(0));
assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 3);
// Deletes all remaining docs
assertThat(deleteByQuery().source("test").refresh(true).get(), matcher().deleted(3));
assertHitCount(client().prepareSearch("test").setTypes("test").setSize(0).get(), 0);
}
public void testDeleteByQueryWithOneIndex() throws Exception {
final long docs = randomIntBetween(1, 50);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test", "doc", String.valueOf(i)).setSource("fields1", 1));
}
indexRandom(true, true, true, builders);
assertThat(deleteByQuery().source("t*").refresh(true).get(), matcher().deleted(docs));
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
}
public void testDeleteByQueryWithMultipleIndices() throws Exception {
final int indices = randomIntBetween(2, 5);
final int docs = randomIntBetween(2, 10) * 2;
long[] candidates = new long[indices];
// total number of expected deletions
long deletions = 0;
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < indices; i++) {
// number of documents to be deleted with the upcoming delete-by-query
// (this number differs for each index)
candidates[i] = randomIntBetween(1, docs);
deletions = deletions + candidates[i];
for (int j = 0; j < docs; j++) {
boolean candidate = (j < candidates[i]);
builders.add(client().prepareIndex("test-" + i, "doc", String.valueOf(j)).setSource("candidate", candidate));
}
}
indexRandom(true, true, true, builders);
// Deletes all the documents with candidate=true
assertThat(deleteByQuery().source("test-*").filter(termQuery("candidate", true)).refresh(true).get(),
matcher().deleted(deletions));
for (int i = 0; i < indices; i++) {
long remaining = docs - candidates[i];
assertHitCount(client().prepareSearch("test-" + i).setSize(0).get(), remaining);
}
assertHitCount(client().prepareSearch().setSize(0).get(), (indices * docs) - deletions);
}
public void testDeleteByQueryWithMissingIndex() throws Exception {
indexRandom(true, client().prepareIndex("test", "test", "1").setSource("foo", "a"));
assertHitCount(client().prepareSearch().setSize(0).get(), 1);
try {
deleteByQuery().source("missing").get();
fail("should have thrown an exception because of a missing index");
} catch (IndexNotFoundException e) {
// Ok
}
}
public void testDeleteByQueryWithRouting() throws Exception {
assertAcked(prepareCreate("test").setSettings("number_of_shards", 2));
ensureGreen("test");
final int docs = randomIntBetween(2, 10);
logger.info("--> indexing [{}] documents with routing", docs);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test", "test", String.valueOf(i)).setRouting(String.valueOf(i)).setSource("field1", 1));
}
indexRandom(true, true, true, builders);
logger.info("--> counting documents with no routing, should be equal to [{}]", docs);
assertHitCount(client().prepareSearch().setSize(0).get(), docs);
String routing = String.valueOf(randomIntBetween(2, docs));
logger.info("--> counting documents with routing [{}]", routing);
long expected = client().prepareSearch().setSize(0).setRouting(routing).get().getHits().totalHits();
logger.info("--> delete all documents with routing [{}] with a delete-by-query", routing);
DeleteByQueryRequestBuilder delete = deleteByQuery().source("test");
delete.source().setRouting(routing);
assertThat(delete.refresh(true).get(), matcher().deleted(expected));
assertHitCount(client().prepareSearch().setSize(0).get(), docs - expected);
}
public void testDeleteByMatchQuery() throws Exception {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
final int docs = scaledRandomIntBetween(10, 100);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test", "test", Integer.toString(i))
.setRouting(randomAsciiOfLengthBetween(1, 5))
.setSource("foo", "bar"));
}
indexRandom(true, true, true, builders);
int n = between(0, docs - 1);
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchQuery("_id", Integer.toString(n))).get(), 1);
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).get(), docs);
DeleteByQueryRequestBuilder delete = deleteByQuery().source("alias").filter(matchQuery("_id", Integer.toString(n)));
assertThat(delete.refresh(true).get(), matcher().deleted(1L));
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).get(), docs - 1);
}
public void testDeleteByQueryWithDateMath() throws Exception {
indexRandom(true, client().prepareIndex("test", "type", "1").setSource("d", "2013-01-01"));
DeleteByQueryRequestBuilder delete = deleteByQuery().source("test").filter(rangeQuery("d").to("now-1h"));
assertThat(delete.refresh(true).get(), matcher().deleted(1L));
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
}
public void testDeleteByQueryOnReadOnlyIndex() throws Exception {
createIndex("test");
final int docs = randomIntBetween(1, 50);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test", "test", Integer.toString(i)).setSource("field", 1));
}
indexRandom(true, true, true, builders);
try {
enableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
assertThat(deleteByQuery().source("test").refresh(true).get(), matcher().deleted(0).failures(docs));
} finally {
disableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
}
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
}
}

View File

@ -0,0 +1,184 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.IndexModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.plugins.Plugin;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
/**
* Tests that you can actually cancel a delete-by-query request and all the plumbing works. Doesn't test all of the different cancellation
* places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do
* not exercise important portion of the stack like transport and task management.
*/
public class DeleteByQueryCancelTests extends ReindexTestCase {
private static final String INDEX = "test-delete-by-query";
private static final String TYPE = "test";
private static final int MAX_DELETIONS = 10;
private static final CyclicBarrier barrier = new CyclicBarrier(2);
@Override
protected int numberOfShards() {
// Only 1 shard and no replica so that test execution
// can be easily controlled within a {@link IndexingOperationListener#preDelete}
return 1;
}
@Override
protected int numberOfReplicas() {
// Only 1 shard and no replica so that test execution
// can be easily controlled within a {@link IndexingOperationListener#preDelete}
return 0;
}
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(DeleteByQueryCancellationPlugin.class);
return plugins;
}
public void testCancel() throws Exception {
createIndex(INDEX);
int totalNumShards = getNumShards(INDEX).totalNumShards;
// Number of documents to be deleted in this test
final int nbDocsToDelete = totalNumShards * MAX_DELETIONS;
// Total number of documents that will be created in this test
final int nbDocs = nbDocsToDelete * randomIntBetween(1, 5);
for (int i = 0; i < nbDocs; i++) {
indexRandom(false, client().prepareIndex(INDEX, TYPE, String.valueOf(i)).setSource("n", i));
}
refresh(INDEX);
assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), nbDocs);
// Executes the delete by query; each shard will block after MAX_DELETIONS
DeleteByQueryRequestBuilder deleteByQuery = deleteByQuery().source("_all");
deleteByQuery.source().setSize(1);
ListenableActionFuture<BulkIndexByScrollResponse> future = deleteByQuery.execute();
// Waits for the indexing operation listener to block
barrier.await(30, TimeUnit.SECONDS);
// Status should show running
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks()
.setActions(DeleteByQueryAction.NAME).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
BulkByScrollTask.Status status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
assertNull(status.getReasonCancelled());
// Cancel the request while the deletions are blocked. This will prevent further deletions requests from being sent.
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks()
.setActions(DeleteByQueryAction.NAME).get().getTasks();
assertThat(cancelledTasks, hasSize(1));
// The status should now show canceled. The request will still be in the list because the script is still blocked.
tasksList = client().admin().cluster().prepareListTasks().setActions(DeleteByQueryAction.NAME).setDetailed(true).get();
assertThat(tasksList.getNodeFailures(), empty());
assertThat(tasksList.getTaskFailures(), empty());
assertThat(tasksList.getTasks(), hasSize(1));
status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
// Now unblock the listener so that it can proceed
barrier.await();
// And check the status of the response
BulkIndexByScrollResponse response = future.get();
assertThat(response, matcher()
.deleted(lessThanOrEqualTo((long) MAX_DELETIONS)).batches(MAX_DELETIONS).reasonCancelled(equalTo("by user request")));
}
public static class DeleteByQueryCancellationPlugin extends Plugin {
@Override
public String name() {
return "delete-by-query-cancellation";
}
@Override
public String description() {
return "See " + DeleteByQueryCancellationPlugin.class.getName();
}
@Override
public void onIndexModule(IndexModule indexModule) {
indexModule.addIndexOperationListener(new BlockingDeleteListener());
}
}
/**
* A {@link IndexingOperationListener} that allows a given number of documents to be deleted
* and then blocks until it is notified to proceed.
*/
public static class BlockingDeleteListener implements IndexingOperationListener {
private final CountDown blockAfter = new CountDown(MAX_DELETIONS);
@Override
public Engine.Delete preDelete(Engine.Delete delete) {
if (blockAfter.isCountedDown() || (TYPE.equals(delete.type()) == false)) {
return delete;
}
if (blockAfter.countDown()) {
try {
// Tell the test we've deleted enough documents.
barrier.await(30, TimeUnit.SECONDS);
// Wait for the test to tell us to proceed.
barrier.await(30, TimeUnit.SECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
throw new RuntimeException(e);
}
}
return delete;
}
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
public class DeleteByQueryConcurrentTests extends ReindexTestCase {
public void testConcurrentDeleteByQueriesOnDifferentDocs() throws Throwable {
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 5)];
final long docs = randomIntBetween(1, 50);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
for (int t = 0; t < threads.length; t++) {
builders.add(client().prepareIndex("test", "doc").setSource("field", t));
}
}
indexRandom(true, true, true, builders);
final CountDownLatch start = new CountDownLatch(1);
for (int t = 0; t < threads.length; t++) {
final int threadNum = t;
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.termQuery("field", threadNum)).get(), docs);
Runnable r = () -> {
try {
start.await();
assertThat(deleteByQuery().source("_all").filter(termQuery("field", threadNum)).refresh(true).get(),
matcher().deleted(docs));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
threads[t] = new Thread(r);
threads[t].start();
}
start.countDown();
for (Thread thread : threads) {
thread.join();
}
for (int t = 0; t < threads.length; t++) {
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.termQuery("field", t)).get(), 0);
}
}
public void testConcurrentDeleteByQueriesOnSameDocs() throws Throwable {
final long docs = randomIntBetween(50, 100);
List<IndexRequestBuilder> builders = new ArrayList<>();
for (int i = 0; i < docs; i++) {
builders.add(client().prepareIndex("test", "doc", String.valueOf(i)).setSource("foo", "bar"));
}
indexRandom(true, true, true, builders);
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 9)];
final CountDownLatch start = new CountDownLatch(1);
final MatchQueryBuilder query = matchQuery("foo", "bar");
final AtomicLong deleted = new AtomicLong(0);
for (int t = 0; t < threads.length; t++) {
Runnable r = () -> {
try {
start.await();
BulkIndexByScrollResponse response = deleteByQuery().source("test").filter(query).refresh(true).get();
// Some deletions might fail due to version conflict, but
// what matters here is the total of successful deletions
deleted.addAndGet(response.getDeleted());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
};
threads[t] = new Thread(r);
threads[t].start();
}
start.countDown();
for (Thread thread : threads) {
thread.join();
}
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0L);
assertThat(deleted.get(), equalTo(docs));
}
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<ReindexRequest, BulkIndexByScrollResponse> {
public void testRoutingCopiedByDefault() throws Exception {
IndexRequest index = new IndexRequest();
action().copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals("foo", index.routing());
}
@ -37,7 +37,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
TransportReindexAction.AsyncIndexBySearchAction action = action();
action.mainRequest.getDestination().routing("keep");
IndexRequest index = new IndexRequest();
action.copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals("foo", index.routing());
}
@ -45,7 +45,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
TransportReindexAction.AsyncIndexBySearchAction action = action();
action.mainRequest.getDestination().routing("discard");
IndexRequest index = new IndexRequest();
action.copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals(null, index.routing());
}
@ -53,7 +53,7 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
TransportReindexAction.AsyncIndexBySearchAction action = action();
action.mainRequest.getDestination().routing("=cat");
IndexRequest index = new IndexRequest();
action.copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals("cat", index.routing());
}
@ -61,13 +61,13 @@ public class ReindexMetadataTests extends AbstractAsyncBulkIndexbyScrollActionMe
TransportReindexAction.AsyncIndexBySearchAction action = action();
action.mainRequest.getDestination().routing("==]");
IndexRequest index = new IndexRequest();
action.copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action.copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals("=]", index.routing());
}
@Override
protected TransportReindexAction.AsyncIndexBySearchAction action() {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener());
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null);
}
@Override

View File

@ -20,7 +20,10 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptService;
import java.util.Map;
@ -31,6 +34,7 @@ import static org.hamcrest.Matchers.containsString;
* Tests index-by-search with a script modifying the documents.
*/
public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<ReindexRequest, BulkIndexByScrollResponse> {
public void testSetIndex() throws Exception {
Object dest = randomFrom(new Object[] {234, 234L, "pancake"});
IndexRequest index = applyScript((Map<String, Object> ctx) -> ctx.put("_index", dest));
@ -129,11 +133,12 @@ public class ReindexScriptTests extends AbstractAsyncBulkIndexByScrollActionScri
@Override
protected ReindexRequest request() {
return new ReindexRequest();
return new ReindexRequest(new SearchRequest(), new IndexRequest());
}
@Override
protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest> action() {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, null, null, threadPool, request(), listener());
protected AbstractAsyncBulkIndexByScrollAction<ReindexRequest> action(ScriptService scriptService, ReindexRequest request) {
return new TransportReindexAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(), scriptService,
null);
}
}

View File

@ -43,6 +43,10 @@ public abstract class ReindexTestCase extends ESIntegTestCase {
return UpdateByQueryAction.INSTANCE.newRequestBuilder(client());
}
protected DeleteByQueryRequestBuilder deleteByQuery() {
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client());
}
protected RethrottleRequestBuilder rethrottle() {
return RethrottleAction.INSTANCE.newRequestBuilder(client());
}

View File

@ -27,14 +27,13 @@ public class UpdateByQueryMetadataTests
extends AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
public void testRoutingIsCopied() throws Exception {
IndexRequest index = new IndexRequest();
action().copyMetadata(index, doc(RoutingFieldMapper.NAME, "foo"));
action().copyMetadata(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc(RoutingFieldMapper.NAME, "foo"));
assertEquals("foo", index.routing());
}
@Override
protected TransportUpdateByQueryAction.AsyncIndexBySearchAction action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(),
listener());
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request(), listener(), null, null);
}
@Override

View File

@ -19,6 +19,8 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.script.ScriptService;
import java.util.Date;
import java.util.Map;
@ -26,6 +28,7 @@ import static org.hamcrest.Matchers.containsString;
public class UpdateByQueryWithScriptTests
extends AbstractAsyncBulkIndexByScrollActionScriptTestCase<UpdateByQueryRequest, BulkIndexByScrollResponse> {
public void testModifyingCtxNotAllowed() {
/*
* Its important that none of these actually match any of the fields.
@ -49,7 +52,8 @@ public class UpdateByQueryWithScriptTests
}
@Override
protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> action() {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, null, threadPool, null, request(), listener());
protected AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> action(ScriptService scriptService, UpdateByQueryRequest request) {
return new TransportUpdateByQueryAction.AsyncIndexBySearchAction(task, logger, null, threadPool, request, listener(),
scriptService, null);
}
}

View File

@ -0,0 +1,304 @@
---
"Basic response":
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: test
body:
query:
match_all: {}
- is_false: timed_out
- match: {deleted: 1}
- is_false: created
- is_false: updated
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {noops: 0}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 0}
---
"wait_for_completion=false":
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
wait_for_completion: false
index: test
body:
query:
match_all: {}
- match: {task: '/.+:\d+/'}
- set: {task: task}
- is_false: version_conflicts
- is_false: batches
- is_false: failures
- is_false: noops
- is_false: took
- is_false: throttled_millis
- is_false: created
- is_false: updated
- is_false: deleted
- do:
tasks.list:
wait_for_completion: true
task_id: $task
- is_false: node_failures
---
"Response for version conflict":
- do:
indices.create:
index: test
body:
settings:
index.refresh_interval: -1
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
# Creates a new version for reindex to miss on scan.
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test2" }
- do:
catch: conflict
delete_by_query:
index: test
body:
query:
match_all: {}
- match: {deleted: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
- match: {failures.0.index: test}
- match: {failures.0.type: foo}
- match: {failures.0.id: "1"}
- match: {failures.0.status: 409}
- match: {failures.0.cause.type: version_conflict_engine_exception}
# Use a regex so we don't mind if the current version isn't always 1. Sometimes it comes out 2.
- match: {failures.0.cause.reason: "/\\[foo\\]\\[1\\]:.version.conflict,.current.version.\\[\\d+\\].is.different.than.the.one.provided.\\[\\d+\\]/"}
- match: {failures.0.cause.shard: /\d+/}
- match: {failures.0.cause.index: test}
- gte: { took: 0 }
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 1}
---
"Response for version conflict with conflicts=proceed":
- do:
indices.create:
index: test
body:
settings:
index.refresh_interval: -1
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test" }
- do:
indices.refresh: {}
# Creates a new version for reindex to miss on scan.
- do:
index:
index: test
type: foo
id: 1
body: { "text": "test2" }
- do:
delete_by_query:
index: test
conflicts: proceed
body:
query:
match_all: {}
- match: {deleted: 0}
- match: {version_conflicts: 1}
- match: {batches: 1}
- match: {noops: 0}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 1}
---
"Limit by query":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "user": "junk" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: twitter
refresh: true
body:
query:
match:
user: kimchy
- match: {deleted: 1}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- gte: { took: 0 }
- do:
count:
index: twitter
- match: {count: 1}
---
"Limit by size":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: twitter
size: 1
body:
query:
match_all: {}
- match: {deleted: 1}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- do:
indices.refresh: {}
- do:
count:
index: twitter
- match: {count: 1}
---
"Can override scroll_size":
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: test
refresh: true
scroll_size: 1
body:
query:
match_all: {}
- match: {batches: 3}
- match: {deleted: 3}
- do:
count:
index: test
- match: {count: 0}

View File

@ -0,0 +1,99 @@
---
"no body fails":
- do:
catch: param
delete_by_query:
index: _all
---
"invalid conflicts fails":
- do:
index:
index: test
type: test
id: 1
body: { "text": "test" }
- do:
catch: /conflicts may only be .* but was \[cat\]/
delete_by_query:
index: test
conflicts: cat
body:
query:
match_all: {}
---
"invalid size fails":
- do:
index:
index: test
type: test
id: 1
body: { "text": "test" }
- do:
catch: /size should be greater than 0 if the request is limited to some number of documents or -1 if it isn't but it was \[-4\]/
delete_by_query:
index: test
size: -4
body:
query:
match_all: {}
---
"invalid scroll_size fails":
- do:
index:
index: test
type: test
id: 1
body: { "text": "test" }
- do:
catch: /Failed to parse int parameter \[scroll_size\] with value \[asdf\]/
delete_by_query:
index: test
scroll_size: asdf
body:
query:
match_all: {}
---
"source fields may not be modified":
- do:
catch: /fields is not supported in this context/
delete_by_query:
index: test
body:
fields: [_id]
---
"requests_per_second cannot be an empty string":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
delete_by_query:
requests_per_second: ""
index: test
body:
query:
match_all: {}
---
"requests_per_second cannot be negative":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
delete_by_query:
requests_per_second: -12
index: test
body:
query:
match_all: {}
---
"requests_per_second cannot be zero":
- do:
catch: /\[requests_per_second\] must be a float greater than 0. Use "unlimited" to disable throttling./
delete_by_query:
requests_per_second: 0
index: test
body:
query:
match_all: {}

View File

@ -0,0 +1,72 @@
---
"Delete by type":
- do:
index:
index: test
type: t1
id: 1
body: { foo: bar }
- do:
index:
index: test
type: t1
id: 2
body: { foo: bar }
- do:
index:
index: test
type: t2
id: 1
body: { foo: bar }
- do:
index:
index: test
type: t2
id: 2
body: { foo: bar }
- do:
index:
index: test
type: t2
id: 3
body: { foo: baz }
- do:
indices.refresh: {}
- do:
count:
index: test
type: t2
- match: {count: 3}
- do:
delete_by_query:
index: test
type: t2
body:
query:
match:
foo: bar
- is_false: timed_out
- match: {deleted: 2}
- is_false: created
- is_false: updated
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {noops: 0}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- do:
indices.refresh: {}
- do:
count:
index: test
type: t2
- match: {count: 1}

View File

@ -0,0 +1,62 @@
---
"can override consistency":
- do:
indices.create:
index: test
body:
settings:
number_of_replicas: 5
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: test
id: 1
body: {"text": "test"}
consistency: one
- do:
indices.refresh: {}
- do:
catch: unavailable
delete_by_query:
index: test
timeout: 1s
body:
query:
match_all: {}
- match:
failures.0.cause.reason: /Not.enough.active.copies.to.meet.write.consistency.of.\[QUORUM\].\(have.1,.needed.4\)..Timeout\:.\[1s\],.request:.\[BulkShardRequest.to.\[test\].containing.\[1\].requests\]/
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 1}
- do:
delete_by_query:
index: test
consistency: one
body:
query:
match_all: {}
- match: {failures: []}
- match: {deleted: 1}
- match: {version_conflicts: 0}
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 0}

View File

@ -0,0 +1,202 @@
"Throttle the request":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: test
scroll_size: 1
requests_per_second: 1
body:
query:
match_all: {}
- match: {batches: 3}
- match: {deleted: 3}
- gt: {throttled_millis: 1000}
- lt: {throttled_millis: 4000}
---
"requests_per_second supports unlimited which turns off throttling":
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: test
scroll_size: 1
requests_per_second: unlimited
body:
query:
match_all: {}
- match: {batches: 3}
- match: {deleted: 3}
- match: {throttled_millis: 0}
---
"Rethrottle":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
requests_per_second: .00000001 # About 9.5 years to complete the request
wait_for_completion: false
index: test
scroll_size: 1
body:
query:
match_all: {}
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
reindex.rethrottle:
requests_per_second: unlimited
task_id: $task
- do:
tasks.list:
wait_for_completion: true
task_id: $task
- do:
indices.refresh: {}
- do:
count:
index: test
- match: {count: 0}
---
"Rethrottle but not unlimited":
# Throttling happens between each scroll batch so we need to control the size of the batch by using a single shard
# and a small batch size on the request
- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
- do:
cluster.health:
wait_for_status: yellow
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
index:
index: test
type: foo
body: { "text": "test" }
- do:
indices.refresh: {}
- do:
delete_by_query:
requests_per_second: .00000001 # About 9.5 years to complete the request
wait_for_completion: false
index: test
scroll_size: 1
body:
query:
match_all: {}
- match: {task: '/.+:\d+/'}
- set: {task: task}
- do:
reindex.rethrottle:
requests_per_second: 1
task_id: $task
- do:
tasks.list:
wait_for_completion: true
task_id: $task

View File

@ -24,6 +24,7 @@
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted
---
"Response format for updated":
@ -57,6 +58,7 @@
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted
---
"wait_for_completion=false":
@ -88,6 +90,7 @@
- is_false: took
- is_false: throttled_millis
- is_false: created
- is_false: deleted
- do:
tasks.list:

View File

@ -59,7 +59,7 @@
---
"search size fails if not a number":
- do:
catch: '/NumberFormatException: For input string: "cat"/'
catch: '/number_format_exception.*For input string: \"cat\"/'
reindex:
body:
source:

View File

@ -23,6 +23,7 @@
# Update by query can't create
- is_false: created
- is_false: task
- is_false: deleted
---
"wait_for_completion=false":
@ -49,6 +50,7 @@
- is_false: took
- is_false: throttled_millis
- is_false: created
- is_false: deleted
- do:
tasks.list:

View File

@ -102,7 +102,7 @@
- match: {batches: 1}
---
"Setting bogus ctx is an error":
"Setting bogus context is an error":
- do:
index:
index: twitter
@ -113,7 +113,7 @@
indices.refresh: {}
- do:
catch: /Invalid fields added to ctx \[junk\]/
catch: /Invalid fields added to context \[junk\]/
update_by_query:
index: twitter
body:

View File

@ -0,0 +1,207 @@
{
"delete_by_query": {
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-reindex.html",
"methods": ["POST"],
"url": {
"path": "/{index}/_delete_by_query",
"paths": ["/{index}/_delete_by_query", "/{index}/{type}/_delete_by_query"],
"comment": "most things below this are just copied from search.json",
"parts": {
"index": {
"required" : true,
"type" : "list",
"description" : "A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices"
},
"type": {
"type" : "list",
"description" : "A comma-separated list of document types to search; leave empty to perform the operation on all types"
}
},
"params": {
"analyzer": {
"type" : "string",
"description" : "The analyzer to use for the query string"
},
"analyze_wildcard": {
"type" : "boolean",
"description" : "Specify whether wildcard and prefix queries should be analyzed (default: false)"
},
"default_operator": {
"type" : "enum",
"options" : ["AND","OR"],
"default" : "OR",
"description" : "The default operator for query string query (AND or OR)"
},
"df": {
"type" : "string",
"description" : "The field to use as default where no field prefix is given in the query string"
},
"explain": {
"type" : "boolean",
"description" : "Specify whether to return detailed information about score computation as part of a hit"
},
"fields": {
"type" : "list",
"description" : "A comma-separated list of fields to return as part of a hit"
},
"fielddata_fields": {
"type" : "list",
"description" : "A comma-separated list of fields to return as the field data representation of a field for each hit"
},
"from": {
"type" : "number",
"description" : "Starting offset (default: 0)"
},
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"allow_no_indices": {
"type" : "boolean",
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
},
"conflicts": {
"note": "This is not copied from search",
"type" : "enum",
"options": ["abort", "proceed"],
"default": "abort",
"description" : "What to do when the delete-by-query hits version conflicts?"
},
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"lenient": {
"type" : "boolean",
"description" : "Specify whether format-based query failures (such as providing text to a numeric field) should be ignored"
},
"lowercase_expanded_terms": {
"type" : "boolean",
"description" : "Specify whether query terms should be lowercased"
},
"preference": {
"type" : "string",
"description" : "Specify the node or shard the operation should be performed on (default: random)"
},
"q": {
"type" : "string",
"description" : "Query in the Lucene query string syntax"
},
"routing": {
"type" : "list",
"description" : "A comma-separated list of specific routing values"
},
"scroll": {
"type" : "duration",
"description" : "Specify how long a consistent view of the index should be maintained for scrolled search"
},
"search_type": {
"type" : "enum",
"options" : ["query_then_fetch", "dfs_query_then_fetch"],
"description" : "Search operation type"
},
"search_timeout": {
"type" : "time",
"description" : "Explicit timeout for each search request. Defaults to no timeout."
},
"size": {
"type" : "number",
"description" : "Number of hits to return (default: 10)"
},
"sort": {
"type" : "list",
"description" : "A comma-separated list of <field>:<direction> pairs"
},
"_source": {
"type" : "list",
"description" : "True or false to return the _source field or not, or a list of fields to return"
},
"_source_exclude": {
"type" : "list",
"description" : "A list of fields to exclude from the returned _source field"
},
"_source_include": {
"type" : "list",
"description" : "A list of fields to extract and return from the _source field"
},
"terminate_after": {
"type" : "number",
"description" : "The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early."
},
"stats": {
"type" : "list",
"description" : "Specific 'tag' of the request for logging and statistical purposes"
},
"suggest_field": {
"type" : "string",
"description" : "Specify which field to use for suggestions"
},
"suggest_mode": {
"type" : "enum",
"options" : ["missing", "popular", "always"],
"default" : "missing",
"description" : "Specify suggest mode"
},
"suggest_size": {
"type" : "number",
"description" : "How many suggestions to return in response"
},
"suggest_text": {
"type" : "text",
"description" : "The source text for which the suggestions should be returned"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
},
"track_scores": {
"type" : "boolean",
"description": "Whether to calculate and return scores even if they are not used for sorting"
},
"version": {
"type" : "boolean",
"description" : "Specify whether to return document version as part of a hit"
},
"request_cache": {
"type" : "boolean",
"description" : "Specify if request cache should be used for this request or not, defaults to index level setting"
},
"refresh": {
"type" : "boolean",
"description" : "Should the effected indexes be refreshed?"
},
"timeout": {
"type" : "time",
"default": "1m",
"description" : "Time each individual bulk request should wait for shards that are unavailable."
},
"consistency": {
"type" : "enum",
"options" : ["one", "quorum", "all"],
"description" : "Explicit write consistency setting for the operation"
},
"scroll_size": {
"type": "integer",
"defaut_value": 100,
"description": "Size on the scroll request powering the update_by_query"
},
"wait_for_completion": {
"type" : "boolean",
"default": false,
"description" : "Should the request should block until the delete-by-query is complete."
},
"requests_per_second": {
"type": "float",
"default": 0,
"description": "The throttle for this request in sub-requests per second. 0 means set no throttle."
}
}
},
"body": {
"description": "The search definition using the Query DSL",
"required": true
}
}
}

View File

@ -4,7 +4,7 @@
"methods": ["POST"],
"url": {
"path": "/_reindex/{task_id}/_rethrottle",
"paths": ["/_reindex/{task_id}/_rethrottle", "/_update_by_query/{task_id}/_rethrottle"],
"paths": ["/_reindex/{task_id}/_rethrottle", "/_update_by_query/{task_id}/_rethrottle", "/_delete_by_query/{task_id}/_rethrottle"],
"parts": {
"task_id": {
"type": "string",