mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
parent
d76f87155a
commit
1f011f9dea
@ -1225,13 +1225,6 @@
|
||||
<suppress files="plugins[/\\]analysis-phonetic[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]PhoneticTokenFilterFactory.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]analysis-smartcn[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]SimpleSmartChineseAnalysisTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]analysis-stempel[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]analysis[/\\]PolishAnalysisTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]DeleteByQueryRequest.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]DeleteByQueryRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]DeleteByQueryResponse.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]TransportDeleteByQueryAction.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]IndexDeleteByQueryResponseTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]deletebyquery[/\\]TransportDeleteByQueryActionTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]delete-by-query[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]plugin[/\\]deletebyquery[/\\]DeleteByQueryTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cloud[/\\]azure[/\\]AbstractAzureTestCase.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureMinimumMasterNodesTests.java" checks="LineLength" />
|
||||
<suppress files="plugins[/\\]discovery-azure[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]discovery[/\\]azure[/\\]AzureSimpleTests.java" checks="LineLength" />
|
||||
|
@ -118,7 +118,6 @@ class InstallPluginCommand extends SettingCommand {
|
||||
"analysis-phonetic",
|
||||
"analysis-smartcn",
|
||||
"analysis-stempel",
|
||||
"delete-by-query",
|
||||
"discovery-azure",
|
||||
"discovery-ec2",
|
||||
"discovery-gce",
|
||||
|
@ -63,7 +63,6 @@ DEFAULT_PLUGINS = ["analysis-icu",
|
||||
"analysis-phonetic",
|
||||
"analysis-smartcn",
|
||||
"analysis-stempel",
|
||||
"delete-by-query",
|
||||
"discovery-azure",
|
||||
"discovery-ec2",
|
||||
"discovery-gce",
|
||||
|
@ -3,18 +3,6 @@
|
||||
|
||||
API extension plugins add new functionality to Elasticsearch by adding new APIs or features, usually to do with search or mapping.
|
||||
|
||||
[float]
|
||||
=== Core API extension plugins
|
||||
|
||||
The core API extension plugins are:
|
||||
|
||||
<<plugins-delete-by-query,Delete by Query>>::
|
||||
|
||||
The delete by query plugin adds support for deleting all of the documents
|
||||
(from one or more indices) which match the specified query. It is a
|
||||
replacement for the problematic _delete-by-query_ functionality which has been
|
||||
removed from Elasticsearch core.
|
||||
|
||||
[float]
|
||||
=== Community contributed API extension plugins
|
||||
|
||||
@ -46,4 +34,3 @@ A number of plugins have been contributed by our community:
|
||||
http://mahout.apache.org/[Mahout] Collaboration filtering (by hadashiA)
|
||||
* https://github.com/jurgc11/es-change-feed-plugin[WebSocket Change Feed Plugin] (by ForgeRock/Chris Clifton)
|
||||
|
||||
include::delete-by-query.asciidoc[]
|
||||
|
@ -1,270 +0,0 @@
|
||||
[[plugins-delete-by-query]]
|
||||
=== Delete By Query Plugin
|
||||
|
||||
The delete-by-query plugin adds support for deleting all of the documents
|
||||
(from one or more indices) which match the specified query. It is a
|
||||
replacement for the problematic _delete-by-query_ functionality which has been
|
||||
removed from Elasticsearch core.
|
||||
|
||||
Internally, it uses {ref}/search-request-scroll.html[Scroll]
|
||||
and {ref}/docs-bulk.html[Bulk] APIs to delete documents in an efficient and
|
||||
safe manner. It is slower than the old _delete-by-query_ functionality, but
|
||||
fixes the problems with the previous implementation.
|
||||
|
||||
To understand more about why we removed delete-by-query from core and about
|
||||
the semantics of the new implementation, see
|
||||
<<delete-by-query-plugin-reason>>.
|
||||
|
||||
[TIP]
|
||||
============================================
|
||||
Queries which match large numbers of documents may run for a long time,
|
||||
as every document has to be deleted individually. Don't use _delete-by-query_
|
||||
to clean out all or most documents in an index. Rather create a new index and
|
||||
perhaps reindex the documents you want to keep.
|
||||
============================================
|
||||
|
||||
[float]
|
||||
==== Installation
|
||||
|
||||
This plugin can be installed using the plugin manager:
|
||||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/elasticsearch-plugin install delete-by-query
|
||||
----------------------------------------------------------------
|
||||
|
||||
The plugin must be installed on every node in the cluster, and each node must
|
||||
be restarted after installation.
|
||||
|
||||
[float]
|
||||
==== Removal
|
||||
|
||||
The plugin can be removed with the following command:
|
||||
|
||||
[source,sh]
|
||||
----------------------------------------------------------------
|
||||
sudo bin/elasticsearch-plugin remove delete-by-query
|
||||
----------------------------------------------------------------
|
||||
|
||||
The node must be stopped before removing the plugin.
|
||||
|
||||
[[delete-by-query-usage]]
|
||||
==== Using Delete-by-Query
|
||||
|
||||
The query can either be provided using a simple query string as
|
||||
a parameter:
|
||||
|
||||
[source,shell]
|
||||
--------------------------------------------------
|
||||
DELETE /twitter/tweet/_query?q=user:kimchy
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
or using the {ref}/query-dsl.html[Query DSL] defined within the request body:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
DELETE /twitter/tweet/_query
|
||||
{
|
||||
"query": { <1>
|
||||
"term": {
|
||||
"user": "kimchy"
|
||||
}
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
||||
<1> The query must be passed as a value to the `query` key, in the same way as
|
||||
the {ref}/search-search.html[search api].
|
||||
|
||||
Both of the above examples end up doing the same thing, which is to delete all
|
||||
tweets from the twitter index for the user `kimchy`.
|
||||
|
||||
Delete-by-query supports deletion across
|
||||
{ref}/search-search.html#search-multi-index-type[multiple indices and multiple types].
|
||||
|
||||
[float]
|
||||
=== Query-string parameters
|
||||
|
||||
The following query string parameters are supported:
|
||||
|
||||
`q`::
|
||||
|
||||
Instead of using the {ref}/query-dsl.html[Query DSL] to pass a `query` in the request
|
||||
body, you can use the `q` query string parameter to specify a query using
|
||||
{ref}/query-dsl-query-string-query.html#query-string-syntax[`query_string` syntax].
|
||||
In this case, the following additional parameters are supported: `df`,
|
||||
`analyzer`, `default_operator`, `lowercase_expanded_terms`,
|
||||
`analyze_wildcard` and `lenient`.
|
||||
See {ref}/search-uri-request.html[URI search request] for details.
|
||||
|
||||
`size`::
|
||||
|
||||
The number of hits returned by the {ref}/search-request-scroll.html[scroll]
|
||||
request. Defaults to 10. May also be specified in the request body.
|
||||
|
||||
`timeout`::
|
||||
|
||||
The maximum execution time of the delete by query process. Once expired, no
|
||||
more documents will be deleted.
|
||||
|
||||
`routing`::
|
||||
|
||||
A comma separated list of routing values to control which shards the delete by
|
||||
query request should be executed on.
|
||||
|
||||
When using the `q` parameter, the following additional parameters are
|
||||
supported (as explained in {ref}/search-uri-request.html[URI search request]): `df`, `analyzer`,
|
||||
`default_operator`.
|
||||
|
||||
|
||||
[float]
|
||||
=== Response body
|
||||
|
||||
The JSON response looks like this:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"took" : 639,
|
||||
"timed_out" : false,
|
||||
"_indices" : {
|
||||
"_all" : {
|
||||
"found" : 5901,
|
||||
"deleted" : 5901,
|
||||
"missing" : 0,
|
||||
"failed" : 0
|
||||
},
|
||||
"twitter" : {
|
||||
"found" : 5901,
|
||||
"deleted" : 5901,
|
||||
"missing" : 0,
|
||||
"failed" : 0
|
||||
}
|
||||
},
|
||||
"failures" : [ ]
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
||||
Internally, the query is used to execute an initial
|
||||
{ref}/search-request-scroll.html[scroll] request. As hits are
|
||||
pulled from the scroll API, they are passed to the {ref}/docs-bulk.html[Bulk
|
||||
API] for deletion.
|
||||
|
||||
IMPORTANT: Delete by query will only delete the version of the document that
|
||||
was visible to search at the time the request was executed. Any documents
|
||||
that have been reindexed or updated during execution will not be deleted.
|
||||
|
||||
Since documents can be updated or deleted by external operations during the
|
||||
_scroll-bulk_ process, the plugin keeps track of different counters for
|
||||
each index, with the totals displayed under the `_all` index. The counters
|
||||
are as follows:
|
||||
|
||||
`found`::
|
||||
|
||||
The number of documents matching the query for the given index.
|
||||
|
||||
`deleted`::
|
||||
|
||||
The number of documents successfully deleted for the given index.
|
||||
|
||||
`missing`::
|
||||
|
||||
The number of documents that were missing when the plugin tried to delete
|
||||
them. Missing documents were present when the original query was run, but have
|
||||
already been deleted by another process.
|
||||
|
||||
`failed`::
|
||||
|
||||
The number of documents that failed to be deleted for the given index. A
|
||||
document may fail to be deleted if it has been updated to a new version by
|
||||
another process, or if the shard containing the document has gone missing due
|
||||
to hardware failure, for example.
|
||||
|
||||
[[delete-by-query-plugin-reason]]
|
||||
==== Why Delete-By-Query is a plugin
|
||||
|
||||
The old delete-by-query API in Elasticsearch 1.x was fast but problematic. We
|
||||
decided to remove the feature from Elasticsearch for these reasons:
|
||||
|
||||
Forward compatibility::
|
||||
|
||||
The old implementation wrote a delete-by-query request, including the
|
||||
query, to the transaction log. This meant that, when upgrading to a new
|
||||
version, old unsupported queries which cannot be executed might exist in
|
||||
the translog, thus causing data corruption.
|
||||
|
||||
Consistency and correctness::
|
||||
|
||||
The old implementation executed the query and deleted all matching docs on
|
||||
the primary first. It then repeated this procedure on each replica shard.
|
||||
There was no guarantee that the queries on the primary and the replicas
|
||||
matched the same document, so it was quite possible to end up with
|
||||
different documents on each shard copy.
|
||||
|
||||
Resiliency::
|
||||
|
||||
The old implementation could cause out-of-memory exceptions, merge storms,
|
||||
and dramatic slow downs if used incorrectly.
|
||||
|
||||
[float]
|
||||
=== New delete-by-query implementation
|
||||
|
||||
The new implementation, provided by this plugin, is built internally
|
||||
using {ref}/search-request-scroll.html[scroll] to return
|
||||
the document IDs and versions of all the documents that need to be deleted.
|
||||
It then uses the {ref}/docs-bulk.html[`bulk` API] to do the actual deletion.
|
||||
|
||||
This can have performance as well as visibility implications. Delete-by-query
|
||||
now has the following semantics:
|
||||
|
||||
non-atomic::
|
||||
|
||||
A delete-by-query may fail at any time while some documents matching the
|
||||
query have already been deleted.
|
||||
|
||||
try-once::
|
||||
|
||||
A delete-by-query may fail at any time and will not retry it's execution.
|
||||
All retry logic is left to the user.
|
||||
|
||||
syntactic sugar::
|
||||
|
||||
A delete-by-query is equivalent to a scroll search ordered by `_doc` and
|
||||
corresponding bulk-deletes by ID.
|
||||
|
||||
point-in-time::
|
||||
|
||||
A delete-by-query will only delete the documents that are visible at the
|
||||
point in time the delete-by-query was started, equivalent to the
|
||||
scan/scroll API.
|
||||
|
||||
consistent::
|
||||
|
||||
A delete-by-query will yield consistent results across all replicas of a
|
||||
shard.
|
||||
|
||||
forward-compatible::
|
||||
|
||||
A delete-by-query will only send IDs to the shards as deletes such that no
|
||||
queries are stored in the transaction logs that might not be supported in
|
||||
the future.
|
||||
|
||||
visibility::
|
||||
|
||||
The effect of a delete-by-query request will not be visible to search
|
||||
until the user refreshes the index, or the index is refreshed
|
||||
automatically.
|
||||
|
||||
The new implementation suffers from two issues, which is why we decided to
|
||||
move the functionality to a plugin instead of replacing the feautre in core:
|
||||
|
||||
* It is not as fast as the previous implementation. For most use cases, this
|
||||
difference should not be noticeable but users running delete-by-query on
|
||||
many matching documents may be affected.
|
||||
|
||||
* There is currently no way to monitor or cancel a running delete-by-query
|
||||
request, except for the `timeout` parameter.
|
||||
|
||||
We have plans to solve both of these issues in a later version of Elasticsearch.
|
@ -33,6 +33,13 @@ The `cloud-azure` plugin has been split into two separate plugins:
|
||||
|
||||
The `cloud-gce` plugin has been renamed to <<discovery-gce>> (`discovery-gce`).
|
||||
|
||||
[role="exclude",id="plugins-delete-by-query"]
|
||||
=== Delete-By-Query plugin removed
|
||||
|
||||
The Delete-By-Query plugin has been removed in favor of a new {ref}/docs-delete-by-query.html[Delete By Query API]
|
||||
implementation in core.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
@ -70,8 +70,8 @@ setting the routing parameter.
|
||||
|
||||
Note that deleting a parent document does not automatically delete its
|
||||
children. One way of deleting all child documents given a parent's id is
|
||||
to use the `delete-by-query` plugin to perform a delete on the child
|
||||
index with the automatically generated (and indexed)
|
||||
to use the <<docs-delete-by-query,Delete By Query API>> to perform a
|
||||
index with the automatically generated (and indexed)
|
||||
field _parent, which is in the format parent_type#parent_id.
|
||||
|
||||
[float]
|
||||
|
@ -95,6 +95,11 @@ cloud:
|
||||
|
||||
Cloud GCE plugin has been renamed to {plugins}/discovery-gce.html[Discovery GCE plugin].
|
||||
|
||||
==== Delete-By-Query plugin removed
|
||||
|
||||
The Delete-By-Query plugin has been removed in favor of a new <<docs-delete-by-query,Delete By Query API>>
|
||||
implementation in core. It now supports throttling, retries and cancellation but no longer supports timeouts.
|
||||
Instead use the <<docs-delete-by-query-cancel-task-api,cancel API>> to cancel deletes that run too long.
|
||||
|
||||
==== Mapper Attachments plugin deprecated
|
||||
|
||||
|
@ -57,3 +57,8 @@ removed in Elasticsearch 6.0.0.
|
||||
|
||||
The deprecated `filters`/`token_filters`/`char_filters` parameter has been
|
||||
renamed `filter`/`token_filter`/`char_filter`.
|
||||
|
||||
==== `DELETE /_query` endpoint removed
|
||||
|
||||
The `DELETE /_query` endpoint provided by the Delete-By-Query plugin has been
|
||||
removed and replaced by the <<docs-delete-by-query,Delete By Query API>>.
|
||||
|
@ -1,24 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
esplugin {
|
||||
description 'The Delete By Query plugin allows to delete documents in Elasticsearch with a single query.'
|
||||
classname 'org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin'
|
||||
}
|
||||
|
@ -1,43 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.Action;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public class DeleteByQueryAction extends Action<DeleteByQueryRequest, DeleteByQueryResponse, DeleteByQueryRequestBuilder> {
|
||||
|
||||
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
|
||||
public static final String NAME = "indices:data/write/delete/by_query";
|
||||
|
||||
private DeleteByQueryAction() {
|
||||
super(NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteByQueryResponse newResponse() {
|
||||
return new DeleteByQueryResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) {
|
||||
return new DeleteByQueryRequestBuilder(client, this);
|
||||
}
|
||||
}
|
@ -1,244 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
import static org.elasticsearch.search.Scroll.readScroll;
|
||||
|
||||
/**
|
||||
* Creates a new {@link DeleteByQueryRequest}. Delete-by-query is since elasticsearch 2.0.0 moved into a plugin
|
||||
* and is not part of elasticsearch core. In contrast to the previous, in-core, implementation delete-by-query now
|
||||
* uses scan/scroll and the returned IDs do delete all documents matching the query. This can have performance
|
||||
* as well as visibility implications. Delete-by-query now has the following semantics:
|
||||
* <ul>
|
||||
* <li>it's <tt>non-actomic</tt>, a delete-by-query may fail at any time while some documents matching the query have already been deleted</li>
|
||||
* <li>it's <tt>try-once</tt>, a delete-by-query may fail at any time and will not retry it's execution. All retry logic is left to the user</li>
|
||||
* <li>it's <tt>syntactic sugar</tt>, a delete-by-query is equivalent to a scan/scroll search and corresponding bulk-deletes by ID</li>
|
||||
* <li>it's executed on a <tt>point-in-time</tt> snapshot, a delete-by-query will only delete the documents that are visible at the point in time the delete-by-query was started, equivalent to the scan/scroll API</li>
|
||||
* <li>it's <tt>consistent</tt>, a delete-by-query will yield consistent results across all replicas of a shard</li>
|
||||
* <li>it's <tt>forward-compativle</tt>, a delete-by-query will only send IDs to the shards as deletes such that no queries are stored in the transaction logs that might not be supported in the future.</li>
|
||||
* <li>it's results won't be visible until the user refreshes the index.</li>
|
||||
* </ul>
|
||||
*
|
||||
* The main reason why delete-by-query is now extracted as a plugin are:
|
||||
* <ul>
|
||||
* <li><tt>forward-compatibility</tt>, the previous implementation was prone to store unsupported queries in the transaction logs which is equvalent to data-loss</li>
|
||||
* <li><tt>consistency & correctness</tt>, the previous implementation was prone to produce different results on a shards replica which can essentially result in a corrupted index</li>
|
||||
* <li><tt>resiliency</tt>, the previous implementation could cause OOM errors, merge-storms and dramatic slowdowns if used incorrectly</li>
|
||||
* </ul>
|
||||
*
|
||||
* While delete-by-query is a very useful feature, it's implementation is very tricky in system that is based on per-document modifications. The move towards
|
||||
* a plugin based solution was mainly done to minimize the risk of cluster failures or corrupted indices which where easily possible wiht the previous implementation.
|
||||
* Users that rely delete by query should install the plugin in oder to use this functionality.
|
||||
*/
|
||||
public class DeleteByQueryRequest extends ActionRequest<DeleteByQueryRequest> implements IndicesRequest.Replaceable {
|
||||
|
||||
private String[] indices = Strings.EMPTY_ARRAY;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false);
|
||||
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
||||
private QueryBuilder query;
|
||||
|
||||
private String routing;
|
||||
|
||||
private int size = 0;
|
||||
|
||||
private Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10));
|
||||
|
||||
private TimeValue timeout;
|
||||
|
||||
public DeleteByQueryRequest() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new delete by query request to run against the provided indices. No indices means
|
||||
* it will run against all indices.
|
||||
*/
|
||||
public DeleteByQueryRequest(String... indices) {
|
||||
this.indices = indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (query == null) {
|
||||
validationException = addValidationError("source is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return this.indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeleteByQueryRequest indices(String... indices) {
|
||||
this.indices = indices;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
if (indicesOptions == null) {
|
||||
throw new IllegalArgumentException("IndicesOptions must not be null");
|
||||
}
|
||||
this.indicesOptions = indicesOptions;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String[] types() {
|
||||
return this.types;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest types(String... types) {
|
||||
this.types = types;
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryBuilder query() {
|
||||
return query;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest query(QueryBuilder queryBuilder) {
|
||||
this.query = queryBuilder;
|
||||
return this;
|
||||
}
|
||||
|
||||
public String routing() {
|
||||
return this.routing;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest routing(String routing) {
|
||||
this.routing = routing;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest routing(String... routings) {
|
||||
this.routing = Strings.arrayToCommaDelimitedString(routings);
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest size(int size) {
|
||||
if (size < 0) {
|
||||
throw new IllegalArgumentException("size must be greater than zero");
|
||||
}
|
||||
this.size = size;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
public Scroll scroll() {
|
||||
return scroll;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest scroll(Scroll scroll) {
|
||||
this.scroll = scroll;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest scroll(TimeValue keepAlive) {
|
||||
return scroll(new Scroll(keepAlive));
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest scroll(String keepAlive) {
|
||||
return scroll(new Scroll(TimeValue.parseTimeValue(keepAlive, null, getClass().getSimpleName() + ".keepAlive")));
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest timeout(TimeValue timeout) {
|
||||
if (timeout == null) {
|
||||
throw new IllegalArgumentException("timeout must not be null");
|
||||
}
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DeleteByQueryRequest timeout(String timeout) {
|
||||
timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
types = in.readStringArray();
|
||||
query = in.readNamedWriteable(QueryBuilder.class);
|
||||
routing = in.readOptionalString();
|
||||
size = in.readVInt();
|
||||
if (in.readBoolean()) {
|
||||
scroll = readScroll(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
timeout = TimeValue.readTimeValue(in);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
out.writeStringArray(types);
|
||||
out.writeNamedWriteable(query);
|
||||
out.writeOptionalString(routing);
|
||||
out.writeVInt(size);
|
||||
out.writeOptionalStreamable(scroll);
|
||||
out.writeOptionalStreamable(timeout);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "delete-by-query indices:" + Arrays.toString(indices) +
|
||||
", types:" + Arrays.toString(types) +
|
||||
", size:" + size +
|
||||
", timeout:" + timeout +
|
||||
", routing:" + routing +
|
||||
", query:" + query;
|
||||
}
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
|
||||
/**
|
||||
* Creates a new {@link DeleteByQueryRequestBuilder}
|
||||
* @see DeleteByQueryRequest
|
||||
*/
|
||||
public class DeleteByQueryRequestBuilder extends ActionRequestBuilder<DeleteByQueryRequest, DeleteByQueryResponse, DeleteByQueryRequestBuilder> {
|
||||
|
||||
public DeleteByQueryRequestBuilder(ElasticsearchClient client, DeleteByQueryAction action) {
|
||||
super(client, action, new DeleteByQueryRequest());
|
||||
}
|
||||
|
||||
public DeleteByQueryRequestBuilder setIndices(String... indices) {
|
||||
request.indices(indices);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and wildcard indices expressions.
|
||||
* <p>
|
||||
* For example indices that don't exist.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setIndicesOptions(IndicesOptions options) {
|
||||
request.indicesOptions(options);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The query used to delete documents.
|
||||
*
|
||||
* @see org.elasticsearch.index.query.QueryBuilders
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setQuery(QueryBuilder queryBuilder) {
|
||||
request.query(queryBuilder);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* A comma separated list of routing values to control the shards the action will be executed on.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setRouting(String routing) {
|
||||
request.routing(routing);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The routing values to control the shards that the action will be executed on.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setRouting(String... routing) {
|
||||
request.routing(routing);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional timeout to control how long the delete by query is allowed to take.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setTimeout(TimeValue timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* An optional timeout to control how long the delete by query is allowed to take.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setTimeout(String timeout) {
|
||||
request.timeout(timeout);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* The types of documents the query will run against. Defaults to all types.
|
||||
*/
|
||||
public DeleteByQueryRequestBuilder setTypes(String... types) {
|
||||
request.types(types);
|
||||
return this;
|
||||
}
|
||||
|
||||
}
|
@ -1,201 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
|
||||
|
||||
/**
|
||||
* Delete by query response
|
||||
* @see DeleteByQueryRequest
|
||||
*/
|
||||
public class DeleteByQueryResponse extends ActionResponse implements ToXContent {
|
||||
|
||||
private long tookInMillis;
|
||||
private boolean timedOut = false;
|
||||
|
||||
private long found;
|
||||
private long deleted;
|
||||
private long missing;
|
||||
private long failed;
|
||||
|
||||
private IndexDeleteByQueryResponse[] indices = IndexDeleteByQueryResponse.EMPTY_ARRAY;
|
||||
private ShardOperationFailedException[] shardFailures = ShardSearchFailure.EMPTY_ARRAY;
|
||||
|
||||
DeleteByQueryResponse() {
|
||||
}
|
||||
|
||||
DeleteByQueryResponse(long tookInMillis, boolean timedOut, long found, long deleted, long missing, long failed, IndexDeleteByQueryResponse[] indices, ShardOperationFailedException[] shardFailures) {
|
||||
this.tookInMillis = tookInMillis;
|
||||
this.timedOut = timedOut;
|
||||
this.found = found;
|
||||
this.deleted = deleted;
|
||||
this.missing = missing;
|
||||
this.failed = failed;
|
||||
this.indices = indices;
|
||||
this.shardFailures = shardFailures;
|
||||
}
|
||||
|
||||
/**
|
||||
* The responses from all the different indices.
|
||||
*/
|
||||
public IndexDeleteByQueryResponse[] getIndices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* The response of a specific index.
|
||||
*/
|
||||
public IndexDeleteByQueryResponse getIndex(String index) {
|
||||
if (index == null) {
|
||||
return null;
|
||||
}
|
||||
for (IndexDeleteByQueryResponse i : indices) {
|
||||
if (index.equals(i.getIndex())) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public TimeValue getTook() {
|
||||
return new TimeValue(tookInMillis);
|
||||
}
|
||||
|
||||
public long getTookInMillis() {
|
||||
return tookInMillis;
|
||||
}
|
||||
|
||||
public boolean isTimedOut() {
|
||||
return this.timedOut;
|
||||
}
|
||||
|
||||
public long getTotalFound() {
|
||||
return found;
|
||||
}
|
||||
|
||||
public long getTotalDeleted() {
|
||||
return deleted;
|
||||
}
|
||||
|
||||
public long getTotalMissing() {
|
||||
return missing;
|
||||
}
|
||||
|
||||
public long getTotalFailed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
public ShardOperationFailedException[] getShardFailures() {
|
||||
return shardFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
tookInMillis = in.readVLong();
|
||||
timedOut = in.readBoolean();
|
||||
found = in.readVLong();
|
||||
deleted = in.readVLong();
|
||||
missing = in.readVLong();
|
||||
failed = in.readVLong();
|
||||
|
||||
int size = in.readVInt();
|
||||
indices = new IndexDeleteByQueryResponse[size];
|
||||
for (int i = 0; i < size; i++) {
|
||||
IndexDeleteByQueryResponse index = new IndexDeleteByQueryResponse();
|
||||
index.readFrom(in);
|
||||
indices[i] = index;
|
||||
}
|
||||
|
||||
size = in.readVInt();
|
||||
if (size == 0) {
|
||||
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
|
||||
} else {
|
||||
shardFailures = new ShardSearchFailure[size];
|
||||
for (int i = 0; i < shardFailures.length; i++) {
|
||||
shardFailures[i] = readShardSearchFailure(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeVLong(tookInMillis);
|
||||
out.writeBoolean(timedOut);
|
||||
out.writeVLong(found);
|
||||
out.writeVLong(deleted);
|
||||
out.writeVLong(missing);
|
||||
out.writeVLong(failed);
|
||||
|
||||
out.writeVInt(indices.length);
|
||||
for (IndexDeleteByQueryResponse indexResponse : indices) {
|
||||
indexResponse.writeTo(out);
|
||||
}
|
||||
|
||||
out.writeVInt(shardFailures.length);
|
||||
for (ShardOperationFailedException shardSearchFailure : shardFailures) {
|
||||
shardSearchFailure.writeTo(out);
|
||||
}
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String TOOK = "took";
|
||||
static final String TIMED_OUT = "timed_out";
|
||||
static final String INDICES = "_indices";
|
||||
static final String FAILURES = "failures";
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.TOOK, tookInMillis);
|
||||
builder.field(Fields.TIMED_OUT, timedOut);
|
||||
|
||||
builder.startObject(Fields.INDICES);
|
||||
IndexDeleteByQueryResponse all = new IndexDeleteByQueryResponse("_all", found, deleted, missing, failed);
|
||||
all.toXContent(builder, params);
|
||||
for (IndexDeleteByQueryResponse indexResponse : indices) {
|
||||
indexResponse.toXContent(builder, params);
|
||||
}
|
||||
builder.endObject();
|
||||
|
||||
builder.startArray(Fields.FAILURES);
|
||||
if (shardFailures != null) {
|
||||
for (ShardOperationFailedException shardFailure : shardFailures) {
|
||||
builder.startObject();
|
||||
shardFailure.toXContent(builder, params);
|
||||
builder.endObject();
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
return builder;
|
||||
}
|
||||
}
|
@ -1,155 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Delete by query response executed on a specific index.
|
||||
*/
|
||||
public class IndexDeleteByQueryResponse extends ActionResponse implements ToXContent {
|
||||
|
||||
public static final IndexDeleteByQueryResponse[] EMPTY_ARRAY = new IndexDeleteByQueryResponse[0];
|
||||
|
||||
private String index;
|
||||
|
||||
private long found = 0L;
|
||||
private long deleted = 0L;
|
||||
private long missing = 0L;
|
||||
private long failed = 0L;
|
||||
|
||||
IndexDeleteByQueryResponse() {
|
||||
}
|
||||
|
||||
IndexDeleteByQueryResponse(String index) {
|
||||
this.index = index;
|
||||
}
|
||||
|
||||
/**
|
||||
* Instantiates an IndexDeleteByQueryResponse with given values for counters. Counters should not be negative.
|
||||
*/
|
||||
public IndexDeleteByQueryResponse(String index, long found, long deleted, long missing, long failed) {
|
||||
this(index);
|
||||
incrementFound(found);
|
||||
incrementDeleted(deleted);
|
||||
incrementMissing(missing);
|
||||
incrementFailed(failed);
|
||||
}
|
||||
|
||||
public String getIndex() {
|
||||
return this.index;
|
||||
}
|
||||
|
||||
public long getFound() {
|
||||
return found;
|
||||
}
|
||||
|
||||
public void incrementFound() {
|
||||
incrementFound(1L);
|
||||
}
|
||||
|
||||
public void incrementFound(long delta) {
|
||||
assert (found + delta >= 0) : "counter 'found' cannot be negative";
|
||||
this.found = found + delta;
|
||||
}
|
||||
|
||||
public long getDeleted() {
|
||||
return deleted;
|
||||
}
|
||||
|
||||
public void incrementDeleted() {
|
||||
incrementDeleted(1L);
|
||||
}
|
||||
|
||||
public void incrementDeleted(long delta) {
|
||||
assert (deleted + delta >= 0) : "counter 'deleted' cannot be negative";
|
||||
this.deleted = deleted + delta;
|
||||
}
|
||||
|
||||
public long getMissing() {
|
||||
return missing;
|
||||
}
|
||||
|
||||
public void incrementMissing() {
|
||||
incrementMissing(1L);
|
||||
}
|
||||
|
||||
public void incrementMissing(long delta) {
|
||||
assert (missing + delta >= 0) : "counter 'missing' cannot be negative";
|
||||
this.missing = missing + delta;
|
||||
}
|
||||
|
||||
public long getFailed() {
|
||||
return failed;
|
||||
}
|
||||
|
||||
public void incrementFailed() {
|
||||
incrementFailed(1L);
|
||||
}
|
||||
|
||||
public void incrementFailed(long delta) {
|
||||
assert (failed + delta >= 0) : "counter 'failed' cannot be negative";
|
||||
this.failed = failed + delta;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
index = in.readString();
|
||||
found = in.readVLong();
|
||||
deleted = in.readVLong();
|
||||
missing = in.readVLong();
|
||||
failed = in.readVLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeString(index);
|
||||
out.writeVLong(found);
|
||||
out.writeVLong(deleted);
|
||||
out.writeVLong(missing);
|
||||
out.writeVLong(failed);
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String FOUND = "found";
|
||||
static final String DELETED = "deleted";
|
||||
static final String MISSING = "missing";
|
||||
static final String FAILED = "failed";
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(index);
|
||||
builder.field(Fields.FOUND, found);
|
||||
builder.field(Fields.DELETED, deleted);
|
||||
builder.field(Fields.MISSING, missing);
|
||||
builder.field(Fields.FAILED, failed);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
}
|
@ -1,347 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteRequest;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.action.search.TransportSearchAction;
|
||||
import org.elasticsearch.action.search.TransportSearchScrollAction;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchHitField;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
* Delete-By-Query implementation that uses efficient scrolling and bulks deletions to delete large set of documents.
|
||||
*/
|
||||
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, DeleteByQueryResponse> {
|
||||
|
||||
private final TransportSearchAction searchAction;
|
||||
private final TransportSearchScrollAction scrollAction;
|
||||
private final Client client;
|
||||
|
||||
@Inject
|
||||
public TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, Client client,
|
||||
TransportSearchAction transportSearchAction,
|
||||
TransportSearchScrollAction transportSearchScrollAction,
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, DeleteByQueryRequest::new);
|
||||
this.searchAction = transportSearchAction;
|
||||
this.scrollAction = transportSearchScrollAction;
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) {
|
||||
new AsyncDeleteByQueryAction(request, listener).start();
|
||||
}
|
||||
|
||||
class AsyncDeleteByQueryAction {
|
||||
|
||||
private final DeleteByQueryRequest request;
|
||||
private final ActionListener<DeleteByQueryResponse> listener;
|
||||
|
||||
private final long startTime;
|
||||
private final AtomicBoolean timedOut;
|
||||
private final AtomicLong total;
|
||||
|
||||
private volatile ShardOperationFailedException[] shardFailures;
|
||||
private final Map<String, IndexDeleteByQueryResponse> results;
|
||||
|
||||
AsyncDeleteByQueryAction(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) {
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
this.startTime = threadPool.estimatedTimeInMillis();
|
||||
this.timedOut = new AtomicBoolean(false);
|
||||
this.total = new AtomicLong(0L);
|
||||
this.shardFailures = ShardSearchFailure.EMPTY_ARRAY;
|
||||
this.results = new HashMap<>();
|
||||
}
|
||||
|
||||
public void start() {
|
||||
executeScan();
|
||||
}
|
||||
|
||||
void executeScan() {
|
||||
try {
|
||||
final SearchRequest scanRequest = new SearchRequest()
|
||||
.indices(request.indices())
|
||||
.types(request.types())
|
||||
.indicesOptions(request.indicesOptions())
|
||||
.scroll(request.scroll());
|
||||
if (request.routing() != null) {
|
||||
scanRequest.routing(request.routing());
|
||||
}
|
||||
|
||||
List<String> fields = new ArrayList<>();
|
||||
fields.add("_routing");
|
||||
fields.add("_parent");
|
||||
SearchSourceBuilder source = new SearchSourceBuilder()
|
||||
.query(request.query())
|
||||
.fields(fields)
|
||||
.sort("_doc") // important for performance
|
||||
.fetchSource(false)
|
||||
.version(true);
|
||||
if (request.size() > 0) {
|
||||
source.size(request.size());
|
||||
}
|
||||
if (request.timeout() != null) {
|
||||
source.timeout(request.timeout());
|
||||
}
|
||||
scanRequest.source(source);
|
||||
|
||||
logger.trace("executing scan request");
|
||||
searchAction.execute(scanRequest, new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse searchResponse) {
|
||||
long hits = searchResponse.getHits().getTotalHits();
|
||||
logger.trace("first request executed: found [{}] document(s) to delete", hits);
|
||||
total.set(hits);
|
||||
deleteHits(null, searchResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
logger.error("unable to execute the initial scan request of delete by query", t);
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
void executeScroll(final String scrollId) {
|
||||
try {
|
||||
logger.trace("executing scroll request [{}]", scrollId);
|
||||
scrollAction.execute(new SearchScrollRequest().scrollId(scrollId).scroll(request.scroll()), new ActionListener<SearchResponse>() {
|
||||
@Override
|
||||
public void onResponse(SearchResponse scrollResponse) {
|
||||
deleteHits(scrollId, scrollResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.error("scroll request [{}] failed, scrolling document(s) is stopped", e, scrollId);
|
||||
finishHim(scrollId, hasTimedOut(), e);
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
logger.error("unable to execute scroll request [{}]", t, scrollId);
|
||||
finishHim(scrollId, false, t);
|
||||
}
|
||||
}
|
||||
|
||||
void deleteHits(String scrollId, SearchResponse scrollResponse) {
|
||||
final SearchHit[] docs = scrollResponse.getHits().getHits();
|
||||
final String nextScrollId = scrollResponse.getScrollId();
|
||||
addShardFailures(scrollResponse.getShardFailures());
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("scroll request [{}] executed: [{}] document(s) returned", scrollId, docs.length);
|
||||
}
|
||||
|
||||
if ((docs.length == 0) || (nextScrollId == null)) {
|
||||
logger.trace("scrolling documents terminated");
|
||||
// if scrollId is null we are on the first request - just pass the nextScrollId which sill be non-null if the query matched no docs
|
||||
finishHim(scrollId == null ? nextScrollId : scrollId, false, null);
|
||||
return;
|
||||
}
|
||||
|
||||
if (hasTimedOut()) {
|
||||
logger.trace("scrolling documents timed out");
|
||||
// if scrollId is null we are on the first request - just pass the nextScrollId which sill be non-null if the query matched no docs
|
||||
finishHim(scrollId == null ? nextScrollId : scrollId, true, null);
|
||||
return;
|
||||
}
|
||||
|
||||
// Delete the scrolled documents using the Bulk API
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (SearchHit doc : docs) {
|
||||
DeleteRequest delete = new DeleteRequest().index(doc.index()).type(doc.type()).id(doc.id()).version(doc.version());
|
||||
SearchHitField routing = doc.field("_routing");
|
||||
if (routing != null) {
|
||||
delete.routing((String) routing.value());
|
||||
}
|
||||
SearchHitField parent = doc.field("_parent");
|
||||
if (parent != null) {
|
||||
delete.parent((String) parent.value());
|
||||
}
|
||||
bulkRequest.add(delete);
|
||||
}
|
||||
|
||||
logger.trace("executing bulk request with [{}] deletions", bulkRequest.numberOfActions());
|
||||
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse bulkResponse) {
|
||||
onBulkResponse(nextScrollId, bulkResponse);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
onBulkFailure(nextScrollId, docs, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void onBulkResponse(String scrollId, BulkResponse bulkResponse) {
|
||||
try {
|
||||
for (BulkItemResponse item : bulkResponse.getItems()) {
|
||||
IndexDeleteByQueryResponse indexCounter = results.get(item.getIndex());
|
||||
if (indexCounter == null) {
|
||||
indexCounter = new IndexDeleteByQueryResponse(item.getIndex());
|
||||
}
|
||||
indexCounter.incrementFound();
|
||||
if (item.isFailed()) {
|
||||
indexCounter.incrementFailed();
|
||||
} else {
|
||||
DeleteResponse delete = item.getResponse();
|
||||
if (delete.isFound()) {
|
||||
indexCounter.incrementDeleted();
|
||||
} else {
|
||||
indexCounter.incrementMissing();
|
||||
}
|
||||
}
|
||||
results.put(item.getIndex(), indexCounter);
|
||||
}
|
||||
|
||||
logger.trace("scrolling next batch of document(s) with scroll id [{}]", scrollId);
|
||||
executeScroll(scrollId);
|
||||
} catch (Throwable t) {
|
||||
logger.error("unable to process bulk response", t);
|
||||
finishHim(scrollId, false, t);
|
||||
}
|
||||
}
|
||||
|
||||
void onBulkFailure(String scrollId, SearchHit[] docs, Throwable failure) {
|
||||
try {
|
||||
logger.trace("execution of scroll request failed: {}", failure.getMessage());
|
||||
for (SearchHit doc : docs) {
|
||||
IndexDeleteByQueryResponse indexCounter = results.get(doc.index());
|
||||
if (indexCounter == null) {
|
||||
indexCounter = new IndexDeleteByQueryResponse(doc.index());
|
||||
}
|
||||
indexCounter.incrementFound();
|
||||
indexCounter.incrementFailed();
|
||||
results.put(doc.getIndex(), indexCounter);
|
||||
}
|
||||
|
||||
logger.trace("scrolling document terminated due to scroll request failure [{}]", scrollId);
|
||||
finishHim(scrollId, hasTimedOut(), failure);
|
||||
} catch (Throwable t) {
|
||||
logger.error("unable to process bulk failure", t);
|
||||
finishHim(scrollId, false, t);
|
||||
}
|
||||
}
|
||||
|
||||
void finishHim(final String scrollId, boolean scrollTimedOut, Throwable failure) {
|
||||
try {
|
||||
if (scrollTimedOut) {
|
||||
logger.trace("delete-by-query response marked as timed out");
|
||||
timedOut.set(true);
|
||||
}
|
||||
|
||||
if (Strings.hasText(scrollId)) {
|
||||
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
|
||||
clearScrollRequest.addScrollId(scrollId);
|
||||
client.clearScroll(clearScrollRequest, new ActionListener<ClearScrollResponse>() {
|
||||
@Override
|
||||
public void onResponse(ClearScrollResponse clearScrollResponse) {
|
||||
logger.trace("scroll id [{}] cleared", scrollId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
logger.warn("unable to clear scroll id [{}]: {}", scrollId, e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
if (failure != null) {
|
||||
logger.trace("scrolling document(s) terminated with failures: {}", failure.getMessage());
|
||||
listener.onFailure(failure);
|
||||
} else {
|
||||
logger.trace("scrolling document(s) terminated with success");
|
||||
listener.onResponse(buildResponse());
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
listener.onFailure(t);
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasTimedOut() {
|
||||
return request.timeout() != null && (threadPool.estimatedTimeInMillis() >= (startTime + request.timeout().millis()));
|
||||
}
|
||||
|
||||
void addShardFailures(ShardOperationFailedException[] failures) {
|
||||
if (!CollectionUtils.isEmpty(failures)) {
|
||||
ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length];
|
||||
System.arraycopy(shardFailures, 0, duplicates, 0, shardFailures.length);
|
||||
System.arraycopy(failures, 0, duplicates, shardFailures.length, failures.length);
|
||||
shardFailures = ExceptionsHelper.groupBy(duplicates);
|
||||
}
|
||||
}
|
||||
|
||||
protected DeleteByQueryResponse buildResponse() {
|
||||
long took = threadPool.estimatedTimeInMillis() - startTime;
|
||||
long deleted = 0;
|
||||
long missing = 0;
|
||||
long failed = 0;
|
||||
|
||||
// Calculates the total number deleted/failed/missing documents
|
||||
for (IndexDeleteByQueryResponse result : results.values()) {
|
||||
deleted = deleted + result.getDeleted();
|
||||
missing = missing + result.getMissing();
|
||||
failed = failed + result.getFailed();
|
||||
}
|
||||
IndexDeleteByQueryResponse[] indices = results.values().toArray(new IndexDeleteByQueryResponse[results.size()]);
|
||||
return new DeleteByQueryResponse(took, timedOut.get(), total.get(), deleted, missing, failed, indices, shardFailures);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionModule;
|
||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
|
||||
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.rest.action.deletebyquery.RestDeleteByQueryAction;
|
||||
|
||||
public class DeleteByQueryPlugin extends Plugin {
|
||||
|
||||
public static final String NAME = "delete-by-query";
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "Elasticsearch Delete-By-Query Plugin";
|
||||
}
|
||||
|
||||
public void onModule(ActionModule actionModule) {
|
||||
actionModule.registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class);
|
||||
}
|
||||
|
||||
public void onModule(NetworkModule module) {
|
||||
module.registerRestHandler(RestDeleteByQueryAction.class);
|
||||
}
|
||||
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestChannel;
|
||||
import org.elasticsearch.rest.RestController;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.support.RestActions;
|
||||
import org.elasticsearch.rest.action.support.RestToXContentListener;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.elasticsearch.action.deletebyquery.DeleteByQueryAction.INSTANCE;
|
||||
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
|
||||
|
||||
/**
|
||||
* @see DeleteByQueryRequest
|
||||
*/
|
||||
public class RestDeleteByQueryAction extends BaseRestHandler {
|
||||
|
||||
private IndicesQueriesRegistry indicesQueriesRegistry;
|
||||
|
||||
@Inject
|
||||
public RestDeleteByQueryAction(Settings settings, RestController controller, Client client,
|
||||
IndicesQueriesRegistry indicesQueriesRegistry) {
|
||||
super(settings, client);
|
||||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
controller.registerHandler(DELETE, "/{index}/_query", this);
|
||||
controller.registerHandler(DELETE, "/{index}/{type}/_query", this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws IOException {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest(Strings.splitStringByCommaToArray(request.param("index")));
|
||||
delete.indicesOptions(IndicesOptions.fromRequest(request, delete.indicesOptions()));
|
||||
delete.routing(request.param("routing"));
|
||||
if (request.hasParam("timeout")) {
|
||||
delete.timeout(request.paramAsTime("timeout", null));
|
||||
}
|
||||
if (RestActions.hasBodyContent(request)) {
|
||||
delete.query(RestActions.getQueryContent(RestActions.getRestContent(request), indicesQueriesRegistry, parseFieldMatcher));
|
||||
} else {
|
||||
QueryBuilder queryBuilder = RestActions.urlParamsToQueryBuilder(request);
|
||||
if (queryBuilder != null) {
|
||||
delete.query(queryBuilder);
|
||||
}
|
||||
}
|
||||
delete.types(Strings.splitStringByCommaToArray(request.param("type")));
|
||||
client.execute(INSTANCE, delete, new RestToXContentListener<>(channel));
|
||||
}
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class IndexDeleteByQueryResponseTests extends ESTestCase {
|
||||
public void testIncrements() {
|
||||
String indexName = randomAsciiOfLength(5);
|
||||
|
||||
// Use randomInt to prevent range overflow
|
||||
long found = Math.abs(randomInt());
|
||||
long deleted = Math.abs(randomInt());
|
||||
long missing = Math.abs(randomInt());
|
||||
long failed = Math.abs(randomInt());
|
||||
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse(indexName, found, deleted, missing, failed);
|
||||
assertThat(response.getIndex(), equalTo(indexName));
|
||||
assertThat(response.getFound(), equalTo(found));
|
||||
assertThat(response.getDeleted(), equalTo(deleted));
|
||||
assertThat(response.getMissing(), equalTo(missing));
|
||||
assertThat(response.getFailed(), equalTo(failed));
|
||||
|
||||
response.incrementFound();
|
||||
response.incrementDeleted();
|
||||
response.incrementMissing();
|
||||
response.incrementFailed();
|
||||
|
||||
assertThat(response.getFound(), equalTo(found + 1));
|
||||
assertThat(response.getDeleted(), equalTo(deleted + 1));
|
||||
assertThat(response.getMissing(), equalTo(missing + 1));
|
||||
assertThat(response.getFailed(), equalTo(failed + 1));
|
||||
|
||||
// Use randomInt to prevent range overflow
|
||||
long inc = randomIntBetween(0, 1000);
|
||||
response.incrementFound(inc);
|
||||
response.incrementDeleted(inc);
|
||||
response.incrementMissing(inc);
|
||||
response.incrementFailed(inc);
|
||||
|
||||
assertThat(response.getFound(), equalTo(found + 1 + inc));
|
||||
assertThat(response.getDeleted(), equalTo(deleted + 1 + inc));
|
||||
assertThat(response.getMissing(), equalTo(missing + 1 + inc));
|
||||
assertThat(response.getFailed(), equalTo(failed + 1 + inc));
|
||||
}
|
||||
|
||||
public void testNegativeCounters() {
|
||||
assumeTrue("assertions must be enable for this test to pass", assertionsEnabled());
|
||||
try {
|
||||
new IndexDeleteByQueryResponse("index", -1L, 0L, 0L, 0L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'found' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
new IndexDeleteByQueryResponse("index", 0L, -1L, 0L, 0L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'deleted' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
new IndexDeleteByQueryResponse("index", 0L, 0L, -1L, 0L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'missing' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
new IndexDeleteByQueryResponse("index", 0L, 0L, 0L, -1L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'failed' cannot be negative"), equalTo(true));
|
||||
}
|
||||
}
|
||||
|
||||
public void testNegativeIncrements() {
|
||||
assumeTrue("assertions must be enable for this test to pass", assertionsEnabled());
|
||||
try {
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
|
||||
response.incrementFound(-10L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'found' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
|
||||
response.incrementDeleted(-10L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'deleted' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
|
||||
response.incrementMissing(-10L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'missing' cannot be negative"), equalTo(true));
|
||||
}
|
||||
|
||||
try {
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
|
||||
response.incrementFailed(-1L);
|
||||
fail("should have thrown an assertion error concerning the negative counter");
|
||||
} catch (AssertionError e) {
|
||||
assertThat("message contains error about a negative counter: " + e.getMessage(),
|
||||
e.getMessage().contains("counter 'failed' cannot be negative"), equalTo(true));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse(randomAsciiOfLength(5), Math.abs(randomLong()), Math.abs(randomLong()), Math.abs(randomLong()), Math.abs(randomLong()));
|
||||
Version testVersion = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
|
||||
BytesStreamOutput output = new BytesStreamOutput();
|
||||
output.setVersion(testVersion);
|
||||
response.writeTo(output);
|
||||
|
||||
StreamInput streamInput = StreamInput.wrap(output.bytes());
|
||||
streamInput.setVersion(testVersion);
|
||||
IndexDeleteByQueryResponse deserializedResponse = new IndexDeleteByQueryResponse();
|
||||
deserializedResponse.readFrom(streamInput);
|
||||
|
||||
assertThat(deserializedResponse.getIndex(), equalTo(response.getIndex()));
|
||||
assertThat(deserializedResponse.getFound(), equalTo(response.getFound()));
|
||||
assertThat(deserializedResponse.getDeleted(), equalTo(response.getDeleted()));
|
||||
assertThat(deserializedResponse.getMissing(), equalTo(response.getMissing()));
|
||||
assertThat(deserializedResponse.getFailed(), equalTo(response.getFailed()));
|
||||
}
|
||||
}
|
@ -1,458 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchShardTarget;
|
||||
import org.elasticsearch.search.internal.InternalSearchHit;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
|
||||
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class TransportDeleteByQueryActionTests extends ESSingleNodeTestCase {
|
||||
public void testExecuteScanFailsOnMissingIndex() {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"none"});
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScan();
|
||||
waitForCompletion("scan request should fail on missing index", listener);
|
||||
|
||||
assertFailure(listener, "no such index");
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScan() {
|
||||
createIndex("test");
|
||||
final int numDocs = randomIntBetween(1, 200);
|
||||
for (int i = 1; i <= numDocs; i++) {
|
||||
client().prepareIndex("test", "type").setSource("num", i).get();
|
||||
}
|
||||
client().admin().indices().prepareRefresh("test").get();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), numDocs);
|
||||
|
||||
final long limit = randomIntBetween(0, numDocs);
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"test"}).query(boolQuery().must(rangeQuery("num").lte(limit)));
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScan();
|
||||
waitForCompletion("scan request should return the exact number of documents", listener);
|
||||
|
||||
assertNoFailures(listener);
|
||||
DeleteByQueryResponse response = listener.getResponse();
|
||||
assertNotNull(response);
|
||||
assertThat(response.getTotalFound(), equalTo(limit));
|
||||
assertThat(response.getTotalDeleted(), equalTo(limit));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScrollFailsOnMissingScrollId() {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScroll(null);
|
||||
waitForCompletion("scroll request should fail on missing scroll id", listener);
|
||||
|
||||
assertFailure(listener, "scrollId is missing");
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScrollFailsOnMalformedScrollId() {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScroll("123");
|
||||
waitForCompletion("scroll request should fail on malformed scroll id", listener);
|
||||
|
||||
assertFailure(listener, "Cannot parse scroll id");
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScrollFailsOnExpiredScrollId() {
|
||||
final long numDocs = randomIntBetween(1, 100);
|
||||
for (int i = 1; i <= numDocs; i++) {
|
||||
client().prepareIndex("test", "type").setSource("num", i).get();
|
||||
}
|
||||
client().admin().indices().prepareRefresh("test").get();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), numDocs);
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setScroll(TimeValue.timeValueSeconds(10)).get();
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(numDocs));
|
||||
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
|
||||
ClearScrollResponse clearScrollResponse = client().prepareClearScroll().addScrollId(scrollId).get();
|
||||
assertTrue(clearScrollResponse.isSucceeded());
|
||||
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"test"});
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
|
||||
waitForCompletion("scroll request returns zero documents on expired scroll id", listener);
|
||||
|
||||
assertNotNull(listener.getError());
|
||||
assertThrowableContains(listener.getError(), "No search context found");
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScrollTimedOut() throws InterruptedException {
|
||||
client().prepareIndex("test", "type", "1").setSource("num", "1").get();
|
||||
client().prepareIndex("test", "type", "2").setSource("num", "1").get();
|
||||
client().admin().indices().prepareRefresh("test").get();
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setSize(1).setScroll(TimeValue.timeValueSeconds(10)).get();
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"test"}).timeout(TimeValue.timeValueSeconds(1));
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
final TransportDeleteByQueryAction.AsyncDeleteByQueryAction async = newAsyncAction(delete, listener);
|
||||
// Wait until the action timed out
|
||||
awaitBusy(() -> async.hasTimedOut());
|
||||
|
||||
async.executeScroll(searchResponse.getScrollId());
|
||||
waitForCompletion("scroll request returns zero documents on expired scroll id", listener);
|
||||
|
||||
assertNull(listener.getError());
|
||||
assertTrue(listener.getResponse().isTimedOut());
|
||||
assertThat(listener.getResponse().getTotalDeleted(), equalTo(0L));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScrollNoDocuments() {
|
||||
createIndex("test");
|
||||
SearchResponse searchResponse = client().prepareSearch("test").setScroll(TimeValue.timeValueSeconds(10)).get();
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"test"});
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
|
||||
waitForCompletion("scroll request returns zero documents", listener);
|
||||
|
||||
assertNull(listener.getError());
|
||||
assertFalse(listener.getResponse().isTimedOut());
|
||||
assertThat(listener.getResponse().getTotalFound(), equalTo(0L));
|
||||
assertThat(listener.getResponse().getTotalDeleted(), equalTo(0L));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testExecuteScroll() {
|
||||
final int numDocs = randomIntBetween(1, 100);
|
||||
for (int i = 1; i <= numDocs; i++) {
|
||||
client().prepareIndex("test", "type").setSource("num", i).get();
|
||||
}
|
||||
client().admin().indices().prepareRefresh("test").get();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), numDocs);
|
||||
|
||||
final long limit = randomIntBetween(0, numDocs);
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test")
|
||||
.setScroll(TimeValue.timeValueSeconds(10))
|
||||
.setQuery(boolQuery().must(rangeQuery("num").lte(limit)))
|
||||
.fields("_routing", "_parent")
|
||||
.setFetchSource(false)
|
||||
.setVersion(true)
|
||||
.get();
|
||||
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
assertThat(searchResponse.getHits().getTotalHits(), equalTo(limit));
|
||||
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices(new String[]{"test"}).size(100).query(boolQuery().must(rangeQuery("num").lte(limit)));
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
|
||||
waitForCompletion("scroll request should return all documents", listener);
|
||||
|
||||
assertNull(listener.getError());
|
||||
assertFalse(listener.getResponse().isTimedOut());
|
||||
// docs that have been returned on the 1st page have been skipped
|
||||
final long expectedDeleted = Math.max(0, limit - searchResponse.getHits().hits().length);
|
||||
assertThat(listener.getResponse().getTotalDeleted(), equalTo(expectedDeleted));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testOnBulkResponse() {
|
||||
final int nbItems = randomIntBetween(0, 20);
|
||||
long deleted = 0;
|
||||
long missing = 0;
|
||||
long failed = 0;
|
||||
|
||||
BulkItemResponse[] items = new BulkItemResponse[nbItems];
|
||||
for (int i = 0; i < nbItems; i++) {
|
||||
if (randomBoolean()) {
|
||||
boolean delete = true;
|
||||
if (rarely()) {
|
||||
delete = false;
|
||||
missing++;
|
||||
} else {
|
||||
deleted++;
|
||||
}
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test", "_na_", 0), "type", String.valueOf(i), 1, delete));
|
||||
} else {
|
||||
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed")));
|
||||
failed++;
|
||||
}
|
||||
}
|
||||
|
||||
// We just need a valid scroll id
|
||||
createIndex("test");
|
||||
SearchResponse searchResponse = client().prepareSearch().setScroll(TimeValue.timeValueSeconds(10)).get();
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
|
||||
try {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).onBulkResponse(scrollId, new BulkResponse(items, 0L));
|
||||
waitForCompletion("waiting for bulk response to complete", listener);
|
||||
|
||||
assertNoFailures(listener);
|
||||
assertThat(listener.getResponse().getTotalDeleted(), equalTo(deleted));
|
||||
assertThat(listener.getResponse().getTotalFailed(), equalTo(failed));
|
||||
assertThat(listener.getResponse().getTotalMissing(), equalTo(missing));
|
||||
} finally {
|
||||
client().prepareClearScroll().addScrollId(scrollId).get();
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnBulkResponseMultipleIndices() {
|
||||
final int nbIndices = randomIntBetween(2, 5);
|
||||
|
||||
// Holds counters for the total + all indices
|
||||
final long[] found = new long[1 + nbIndices];
|
||||
final long[] deleted = new long[1 + nbIndices];
|
||||
final long[] missing = new long[1 + nbIndices];
|
||||
final long[] failed = new long[1 + nbIndices];
|
||||
|
||||
final int nbItems = randomIntBetween(0, 100);
|
||||
found[0] = nbItems;
|
||||
|
||||
BulkItemResponse[] items = new BulkItemResponse[nbItems];
|
||||
for (int i = 0; i < nbItems; i++) {
|
||||
int index = randomIntBetween(1, nbIndices);
|
||||
found[index] = found[index] + 1;
|
||||
|
||||
if (randomBoolean()) {
|
||||
boolean delete = true;
|
||||
if (rarely()) {
|
||||
delete = false;
|
||||
missing[0] = missing[0] + 1;
|
||||
missing[index] = missing[index] + 1;
|
||||
} else {
|
||||
deleted[0] = deleted[0] + 1;
|
||||
deleted[index] = deleted[index] + 1;
|
||||
}
|
||||
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse(new ShardId("test-" + index, "_na_", 0), "type", String.valueOf(i), 1, delete));
|
||||
} else {
|
||||
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test-" + index, "type", String.valueOf(i), new Throwable("item failed")));
|
||||
failed[0] = failed[0] + 1;
|
||||
failed[index] = failed[index] + 1;
|
||||
}
|
||||
}
|
||||
|
||||
// We just need a valid scroll id
|
||||
createIndex("test");
|
||||
SearchResponse searchResponse = client().prepareSearch().setScroll(TimeValue.timeValueSeconds(10)).get();
|
||||
String scrollId = searchResponse.getScrollId();
|
||||
assertTrue(Strings.hasText(scrollId));
|
||||
|
||||
try {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).onBulkResponse(scrollId, new BulkResponse(items, 0L));
|
||||
waitForCompletion("waiting for bulk response to complete", listener);
|
||||
|
||||
assertNoFailures(listener);
|
||||
assertThat(listener.getResponse().getTotalDeleted(), equalTo(deleted[0]));
|
||||
assertThat(listener.getResponse().getTotalFailed(), equalTo(failed[0]));
|
||||
assertThat(listener.getResponse().getTotalMissing(), equalTo(missing[0]));
|
||||
|
||||
for (int i = 1; i <= nbIndices; i++) {
|
||||
IndexDeleteByQueryResponse indexResponse = listener.getResponse().getIndex("test-" + i);
|
||||
if (found[i] >= 1) {
|
||||
assertNotNull(indexResponse);
|
||||
assertThat(indexResponse.getFound(), equalTo(found[i]));
|
||||
assertThat(indexResponse.getDeleted(), equalTo(deleted[i]));
|
||||
assertThat(indexResponse.getFailed(), equalTo(failed[i]));
|
||||
assertThat(indexResponse.getMissing(), equalTo(missing[i]));
|
||||
} else {
|
||||
assertNull(indexResponse);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
client().prepareClearScroll().addScrollId(scrollId).get();
|
||||
}
|
||||
}
|
||||
|
||||
public void testOnBulkFailureNoDocuments() {
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
newAsyncAction(delete, listener).onBulkFailure(null, new SearchHit[0], new Throwable("This is a bulk failure"));
|
||||
waitForCompletion("waiting for bulk failure to complete", listener);
|
||||
|
||||
assertFailure(listener, "This is a bulk failure");
|
||||
}
|
||||
|
||||
public void testOnBulkFailure() {
|
||||
final int nbDocs = randomIntBetween(0, 20);
|
||||
SearchHit[] docs = new SearchHit[nbDocs];
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
InternalSearchHit doc = new InternalSearchHit(randomInt(), String.valueOf(i), new Text("type"), null);
|
||||
doc.shard(new SearchShardTarget("node", new Index("test", "_na_"), randomInt()));
|
||||
docs[i] = doc;
|
||||
}
|
||||
|
||||
DeleteByQueryRequest delete = new DeleteByQueryRequest();
|
||||
TestActionListener listener = new TestActionListener();
|
||||
|
||||
TransportDeleteByQueryAction.AsyncDeleteByQueryAction async = newAsyncAction(delete, listener);
|
||||
async.onBulkFailure(null, docs, new Throwable("This is a bulk failure"));
|
||||
waitForCompletion("waiting for bulk failure to complete", listener);
|
||||
assertFailure(listener, "This is a bulk failure");
|
||||
|
||||
DeleteByQueryResponse response = async.buildResponse();
|
||||
assertThat(response.getTotalFailed(), equalTo((long) nbDocs));
|
||||
assertThat(response.getTotalDeleted(), equalTo(0L));
|
||||
}
|
||||
|
||||
public void testFinishHim() {
|
||||
TestActionListener listener = new TestActionListener();
|
||||
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, false, null);
|
||||
waitForCompletion("waiting for finishHim to complete with success", listener);
|
||||
assertNoFailures(listener);
|
||||
assertNotNull(listener.getResponse());
|
||||
assertFalse(listener.getResponse().isTimedOut());
|
||||
|
||||
listener = new TestActionListener();
|
||||
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, true, null);
|
||||
waitForCompletion("waiting for finishHim to complete with timed out = true", listener);
|
||||
assertNoFailures(listener);
|
||||
assertNotNull(listener.getResponse());
|
||||
assertTrue(listener.getResponse().isTimedOut());
|
||||
|
||||
listener = new TestActionListener();
|
||||
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, false, new Throwable("Fake error"));
|
||||
waitForCompletion("waiting for finishHim to complete with error", listener);
|
||||
assertFailure(listener, "Fake error");
|
||||
assertNull(listener.getResponse());
|
||||
}
|
||||
|
||||
private TransportDeleteByQueryAction.AsyncDeleteByQueryAction newAsyncAction(DeleteByQueryRequest request, TestActionListener listener) {
|
||||
TransportDeleteByQueryAction action = getInstanceFromNode(TransportDeleteByQueryAction.class);
|
||||
assertNotNull(action);
|
||||
return action.new AsyncDeleteByQueryAction(request, listener);
|
||||
}
|
||||
|
||||
private void waitForCompletion(String testName, final TestActionListener listener) {
|
||||
logger.info(" --> waiting for delete-by-query [{}] to complete", testName);
|
||||
try {
|
||||
awaitBusy(() -> listener.isTerminated());
|
||||
} catch (InterruptedException e) {
|
||||
fail("exception when waiting for delete-by-query [" + testName + "] to complete: " + e.getMessage());
|
||||
logger.error("exception when waiting for delete-by-query [{}] to complete", e, testName);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertFailure(TestActionListener listener, String expectedMessage) {
|
||||
Throwable t = listener.getError();
|
||||
assertNotNull(t);
|
||||
assertTrue(Strings.hasText(expectedMessage));
|
||||
assertTrue("error message should contain [" + expectedMessage + "] but got [" + t.getMessage() + "]", t.getMessage().contains(expectedMessage));
|
||||
}
|
||||
|
||||
private void assertNoFailures(TestActionListener listener) {
|
||||
assertNull(listener.getError());
|
||||
assertTrue(CollectionUtils.isEmpty(listener.getResponse().getShardFailures()));
|
||||
}
|
||||
|
||||
private void assertSearchContextsClosed() {
|
||||
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for (NodeStats nodeStat : nodesStats.getNodes()){
|
||||
assertThat(nodeStat.getIndices().getSearch().getOpenContexts(), equalTo(0L));
|
||||
}
|
||||
}
|
||||
|
||||
private void assertThrowableContains(Throwable t, String expectedFailure) {
|
||||
assertThat(t.toString(), containsString(expectedFailure));
|
||||
}
|
||||
|
||||
private class TestActionListener implements ActionListener<DeleteByQueryResponse> {
|
||||
private final CountDown count = new CountDown(1);
|
||||
|
||||
private DeleteByQueryResponse response;
|
||||
private Throwable error;
|
||||
|
||||
@Override
|
||||
public void onResponse(DeleteByQueryResponse response) {
|
||||
try {
|
||||
this.response = response;
|
||||
} finally {
|
||||
count.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
try {
|
||||
this.error = e;
|
||||
} finally {
|
||||
count.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isTerminated() {
|
||||
return count.isCountedDown();
|
||||
}
|
||||
|
||||
public DeleteByQueryResponse getResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public Throwable getError() {
|
||||
return error;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,446 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.deletebyquery;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
|
||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
|
||||
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
|
||||
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.index.query.MatchQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
@ClusterScope(scope = SUITE, transportClientRatio = 0)
|
||||
public class DeleteByQueryTests extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(DeleteByQueryPlugin.class);
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithNoSource() {
|
||||
try {
|
||||
newDeleteByQuery().get();
|
||||
fail("should have thrown a validation exception because of the missing source");
|
||||
} catch (ActionRequestValidationException e) {
|
||||
assertThat(e.getMessage(), containsString("source is missing"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithNoIndices() throws Exception {
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery());
|
||||
delete.setIndicesOptions(IndicesOptions.fromOptions(false, true, true, false));
|
||||
assertDBQResponse(delete.get(), 0L, 0L, 0L, 0L);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithOneIndex() throws Exception {
|
||||
final long docs = randomIntBetween(1, 50);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
index("test", "test", String.valueOf(i), "fields1", 1);
|
||||
}
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
|
||||
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("t*").setQuery(QueryBuilders.matchAllQuery());
|
||||
assertDBQResponse(delete.get(), docs, docs, 0L, 0L);
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithMultipleIndices() throws Exception {
|
||||
final int indices = randomIntBetween(2, 5);
|
||||
final int docs = randomIntBetween(2, 10) * 2;
|
||||
long[] candidates = new long[indices];
|
||||
|
||||
for (int i = 0; i < indices; i++) {
|
||||
// number of documents to be deleted with the upcoming delete-by-query
|
||||
// (this number differs for each index)
|
||||
candidates[i] = randomIntBetween(1, docs);
|
||||
|
||||
for (int j = 0; j < docs; j++) {
|
||||
boolean candidate = (j < candidates[i]);
|
||||
index("test-" + i, "test", String.valueOf(j), "candidate", candidate);
|
||||
}
|
||||
}
|
||||
|
||||
// total number of expected deletions
|
||||
long deletions = 0;
|
||||
for (long i : candidates) {
|
||||
deletions = deletions + i;
|
||||
}
|
||||
refresh();
|
||||
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), docs * indices);
|
||||
for (int i = 0; i < indices; i++) {
|
||||
assertHitCount(client().prepareSearch("test-" + i).setSize(0).get(), docs);
|
||||
}
|
||||
|
||||
// Deletes all the documents with candidate=true
|
||||
DeleteByQueryResponse response = newDeleteByQuery().setIndices("test-*").setQuery(QueryBuilders.termQuery("candidate", true)).get();
|
||||
refresh();
|
||||
|
||||
// Checks that the DBQ response returns the expected number of deletions
|
||||
assertDBQResponse(response, deletions, deletions, 0L, 0L);
|
||||
assertNotNull(response.getIndices());
|
||||
assertThat(response.getIndices().length, equalTo(indices));
|
||||
|
||||
for (int i = 0; i < indices; i++) {
|
||||
String indexName = "test-" + i;
|
||||
IndexDeleteByQueryResponse indexResponse = response.getIndex(indexName);
|
||||
assertThat(indexResponse.getFound(), equalTo(candidates[i]));
|
||||
assertThat(indexResponse.getDeleted(), equalTo(candidates[i]));
|
||||
assertThat(indexResponse.getFailed(), equalTo(0L));
|
||||
assertThat(indexResponse.getMissing(), equalTo(0L));
|
||||
assertThat(indexResponse.getIndex(), equalTo(indexName));
|
||||
long remaining = docs - candidates[i];
|
||||
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), remaining);
|
||||
}
|
||||
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), (indices * docs) - deletions);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithMissingIndex() throws Exception {
|
||||
client().prepareIndex("test", "test")
|
||||
.setSource(jsonBuilder().startObject().field("field1", 1).endObject())
|
||||
.setRefresh(true)
|
||||
.get();
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), 1);
|
||||
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test", "missing").setQuery(QueryBuilders.matchAllQuery());
|
||||
try {
|
||||
delete.get();
|
||||
fail("should have thrown an exception because of a missing index");
|
||||
} catch (IndexNotFoundException e) {
|
||||
// Ok
|
||||
}
|
||||
|
||||
delete.setIndicesOptions(IndicesOptions.lenientExpandOpen());
|
||||
assertDBQResponse(delete.get(), 1L, 1L, 0L, 0L);
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithTypes() throws Exception {
|
||||
final long docs = randomIntBetween(1, 50);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
index(randomFrom("test1", "test2", "test3"), "type1", String.valueOf(i), "foo", "bar");
|
||||
index(randomFrom("test1", "test2", "test3"), "type2", String.valueOf(i), "foo", "bar");
|
||||
}
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), docs * 2);
|
||||
assertHitCount(client().prepareSearch().setSize(0).setTypes("type1").get(), docs);
|
||||
assertHitCount(client().prepareSearch().setSize(0).setTypes("type2").get(), docs);
|
||||
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setTypes("type1").setQuery(QueryBuilders.matchAllQuery());
|
||||
assertDBQResponse(delete.get(), docs, docs, 0L, 0L);
|
||||
refresh();
|
||||
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), docs);
|
||||
assertHitCount(client().prepareSearch().setSize(0).setTypes("type1").get(), 0);
|
||||
assertHitCount(client().prepareSearch().setSize(0).setTypes("type2").get(), docs);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithRouting() throws Exception {
|
||||
assertAcked(prepareCreate("test").setSettings("number_of_shards", 2));
|
||||
ensureGreen("test");
|
||||
|
||||
final int docs = randomIntBetween(2, 10);
|
||||
logger.info("--> indexing [{}] documents with routing", docs);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
client().prepareIndex("test", "test", String.valueOf(i)).setRouting(String.valueOf(i)).setSource("field1", 1).get();
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> counting documents with no routing, should be equal to [{}]", docs);
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), docs);
|
||||
|
||||
String routing = String.valueOf(randomIntBetween(2, docs));
|
||||
|
||||
logger.info("--> counting documents with routing [{}]", routing);
|
||||
long expected = client().prepareSearch().setSize(0).setRouting(routing).get().getHits().totalHits();
|
||||
|
||||
logger.info("--> delete all documents with routing [{}] with a delete-by-query", routing);
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setRouting(routing).setQuery(QueryBuilders.matchAllQuery());
|
||||
assertDBQResponse(delete.get(), expected, expected, 0L, 0L);
|
||||
refresh();
|
||||
|
||||
assertHitCount(client().prepareSearch().setSize(0).get(), docs - expected);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByFieldQuery() throws Exception {
|
||||
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
|
||||
|
||||
int numDocs = scaledRandomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("test", "test", Integer.toString(i))
|
||||
.setRouting(randomAsciiOfLengthBetween(1, 5))
|
||||
.setSource("foo", "bar").get();
|
||||
}
|
||||
refresh();
|
||||
|
||||
int n = between(0, numDocs - 1);
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n))).get(), 1);
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).get(), numDocs);
|
||||
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("alias").setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n)));
|
||||
assertDBQResponse(delete.get(), 1L, 1L, 0L, 0L);
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).get(), numDocs - 1);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryWithDateMath() throws Exception {
|
||||
index("test", "type", "1", "d", "2013-01-01");
|
||||
ensureGreen();
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), 1);
|
||||
|
||||
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h"));
|
||||
assertDBQResponse(delete.get(), 1L, 1L, 0L, 0L);
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByTermQuery() throws Exception {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
int numDocs = scaledRandomIntBetween(10, 50);
|
||||
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs + 1];
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i)).setSource("field", "value");
|
||||
}
|
||||
indexRequestBuilders[numDocs] = client().prepareIndex("test", "test", Integer.toString(numDocs)).setSource("field", "other_value");
|
||||
indexRandom(true, indexRequestBuilders);
|
||||
|
||||
SearchResponse searchResponse = client().prepareSearch("test").get();
|
||||
assertNoFailures(searchResponse);
|
||||
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs + 1));
|
||||
|
||||
DeleteByQueryResponse delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.termQuery("field", "value")).get();
|
||||
assertDBQResponse(delete, numDocs, numDocs, 0L, 0L);
|
||||
|
||||
refresh();
|
||||
searchResponse = client().prepareSearch("test").get();
|
||||
assertNoFailures(searchResponse);
|
||||
assertThat(searchResponse.getHits().totalHits(), equalTo(1L));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testConcurrentDeleteByQueriesOnDifferentDocs() throws Throwable {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 5)];
|
||||
final long docs = randomIntBetween(1, 50);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
for (int j = 0; j < threads.length; j++) {
|
||||
index("test", "test", String.valueOf(i * 10 + j), "field", j);
|
||||
}
|
||||
}
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs * threads.length);
|
||||
|
||||
final CountDownLatch start = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
final int threadNum = i;
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.termQuery("field", threadNum)).get(), docs);
|
||||
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
start.await();
|
||||
|
||||
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.termQuery("field", threadNum)).get();
|
||||
assertDBQResponse(rsp, docs, docs, 0L, 0L);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Throwable e) {
|
||||
exceptionHolder.set(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i] = new Thread(r);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
start.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
Throwable assertionError = exceptionHolder.get();
|
||||
if (assertionError != null) {
|
||||
throw assertionError;
|
||||
}
|
||||
|
||||
refresh();
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.termQuery("field", i)).get(), 0);
|
||||
}
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testConcurrentDeleteByQueriesOnSameDocs() throws Throwable {
|
||||
assertAcked(prepareCreate("test").setSettings(Settings.builder().put("index.refresh_interval", -1)));
|
||||
ensureGreen();
|
||||
|
||||
final long docs = randomIntBetween(50, 100);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
index("test", "test", String.valueOf(i), "foo", "bar");
|
||||
}
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
|
||||
|
||||
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 9)];
|
||||
|
||||
final CountDownLatch start = new CountDownLatch(1);
|
||||
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
|
||||
|
||||
final MatchQueryBuilder query = QueryBuilders.matchQuery("foo", "bar");
|
||||
final AtomicLong deleted = new AtomicLong(0);
|
||||
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(query).get(), docs);
|
||||
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
start.await();
|
||||
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(query).get();
|
||||
deleted.addAndGet(rsp.getTotalDeleted());
|
||||
|
||||
assertThat(rsp.getTotalFound(), equalTo(docs));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (Throwable e) {
|
||||
exceptionHolder.set(e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
};
|
||||
threads[i] = new Thread(r);
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
start.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
refresh();
|
||||
|
||||
Throwable assertionError = exceptionHolder.get();
|
||||
if (assertionError != null) {
|
||||
throw assertionError;
|
||||
}
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), 0L);
|
||||
assertThat(deleted.get(), equalTo(docs));
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
public void testDeleteByQueryOnReadOnlyIndex() throws Exception {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
||||
final long docs = randomIntBetween(1, 50);
|
||||
for (int i = 0; i < docs; i++) {
|
||||
index("test", "test", String.valueOf(i), "field", 1);
|
||||
}
|
||||
refresh();
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
|
||||
|
||||
try {
|
||||
enableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
|
||||
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery()).get();
|
||||
assertDBQResponse(rsp, docs, 0L, docs, 0L);
|
||||
} finally {
|
||||
disableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
|
||||
}
|
||||
|
||||
assertHitCount(client().prepareSearch("test").setSize(0).get(), docs);
|
||||
assertSearchContextsClosed();
|
||||
}
|
||||
|
||||
private DeleteByQueryRequestBuilder newDeleteByQuery() {
|
||||
return new DeleteByQueryRequestBuilder(client(), DeleteByQueryAction.INSTANCE);
|
||||
}
|
||||
|
||||
private void assertDBQResponse(DeleteByQueryResponse response, long found, long deleted, long failed, long missing) {
|
||||
assertNotNull(response);
|
||||
assertThat(response.isTimedOut(), equalTo(false));
|
||||
assertThat(response.getShardFailures().length, equalTo(0));
|
||||
assertThat(response.getTotalFound(), equalTo(found));
|
||||
assertThat(response.getTotalDeleted(), equalTo(deleted));
|
||||
assertThat(response.getTotalFailed(), equalTo(failed));
|
||||
assertThat(response.getTotalMissing(), equalTo(missing));
|
||||
}
|
||||
|
||||
private void assertSearchContextsClosed() throws Exception {
|
||||
// The scroll id (and thus the underlying search context) is cleared in
|
||||
// an async manner in TransportDeleteByQueryAction. so we need to use
|
||||
// assertBusy() here to wait for the search context to be released.
|
||||
assertBusy(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for (NodeStats nodeStat : nodesStats.getNodes()){
|
||||
assertThat(nodeStat.getIndices().getSearch().getOpenContexts(), equalTo(0L));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.plugin.deletebyquery.test.rest;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.annotations.Name;
|
||||
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.test.rest.RestTestCandidate;
|
||||
import org.elasticsearch.test.rest.parser.RestTestParseException;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class DeleteByQueryRestIT extends ESRestTestCase {
|
||||
|
||||
public DeleteByQueryRestIT(@Name("yaml") RestTestCandidate testCandidate) {
|
||||
super(testCandidate);
|
||||
}
|
||||
|
||||
@ParametersFactory
|
||||
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
|
||||
return ESRestTestCase.createParameters(0, 1);
|
||||
}
|
||||
}
|
||||
|
@ -1,66 +0,0 @@
|
||||
{
|
||||
"delete_by_query": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-delete-by-query.html",
|
||||
"methods": ["DELETE"],
|
||||
"url": {
|
||||
"path": "/{index}/_query",
|
||||
"paths": ["/{index}/_query", "/{index}/{type}/_query"],
|
||||
"parts": {
|
||||
"index": {
|
||||
"type" : "list",
|
||||
"required": true,
|
||||
"description" : "A comma-separated list of indices to restrict the operation; use `_all` to perform the operation on all indices"
|
||||
},
|
||||
"type": {
|
||||
"type" : "list",
|
||||
"description" : "A comma-separated list of types to restrict the operation"
|
||||
}
|
||||
},
|
||||
"params": {
|
||||
"analyzer": {
|
||||
"type" : "string",
|
||||
"description" : "The analyzer to use for the query string"
|
||||
},
|
||||
"default_operator": {
|
||||
"type" : "enum",
|
||||
"options" : ["AND","OR"],
|
||||
"default" : "OR",
|
||||
"description" : "The default operator for query string query (AND or OR)"
|
||||
},
|
||||
"df": {
|
||||
"type" : "string",
|
||||
"description" : "The field to use as default where no field prefix is given in the query string"
|
||||
},
|
||||
"ignore_unavailable": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
|
||||
},
|
||||
"allow_no_indices": {
|
||||
"type" : "boolean",
|
||||
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
|
||||
},
|
||||
"expand_wildcards": {
|
||||
"type" : "enum",
|
||||
"options" : ["open","closed","none","all"],
|
||||
"default" : "open",
|
||||
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
|
||||
},
|
||||
"q": {
|
||||
"type" : "string",
|
||||
"description" : "Query in the Lucene query string syntax"
|
||||
},
|
||||
"routing": {
|
||||
"type" : "string",
|
||||
"description" : "Specific routing value"
|
||||
},
|
||||
"timeout": {
|
||||
"type" : "time",
|
||||
"description" : "Explicit operation timeout"
|
||||
}
|
||||
}
|
||||
},
|
||||
"body": {
|
||||
"description" : "A query to restrict the operation specified with the Query DSL"
|
||||
}
|
||||
}
|
||||
}
|
@ -1,58 +0,0 @@
|
||||
setup:
|
||||
- do:
|
||||
index:
|
||||
index: test_1
|
||||
type: test
|
||||
id: 1
|
||||
body: { foo: bar }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_1
|
||||
type: test
|
||||
id: 2
|
||||
body: { foo: baz }
|
||||
|
||||
- do:
|
||||
index:
|
||||
index: test_1
|
||||
type: test
|
||||
id: 3
|
||||
body: { foo: foo }
|
||||
|
||||
- do:
|
||||
indices.refresh: {}
|
||||
|
||||
---
|
||||
"Basic delete_by_query":
|
||||
- skip:
|
||||
version: all
|
||||
reason: "Test muted because of a REST test namespace conflict, see https://github.com/elastic/elasticsearch/issues/18469"
|
||||
|
||||
- do:
|
||||
delete_by_query:
|
||||
index: test_1
|
||||
body:
|
||||
query:
|
||||
match:
|
||||
foo: bar
|
||||
|
||||
- do:
|
||||
indices.refresh: {}
|
||||
|
||||
- do:
|
||||
count:
|
||||
index: test_1
|
||||
|
||||
- match: { count: 2 }
|
||||
|
||||
---
|
||||
"Delete_by_query body without query element":
|
||||
- do:
|
||||
catch: request
|
||||
delete_by_query:
|
||||
index: test_1
|
||||
body:
|
||||
match:
|
||||
foo: bar
|
||||
|
@ -103,7 +103,7 @@ fi
|
||||
echo "CONF_FILE=$CONF_FILE" >> /etc/sysconfig/elasticsearch;
|
||||
fi
|
||||
|
||||
run_elasticsearch_service 1 -Edefault.config="$CONF_FILE"
|
||||
run_elasticsearch_service 1 -Ees.default.config="$CONF_FILE"
|
||||
|
||||
# remove settings again otherwise cleaning up before next testrun will fail
|
||||
if is_dpkg ; then
|
||||
@ -208,10 +208,6 @@ fi
|
||||
install_and_check_plugin discovery gce google-api-client-*.jar
|
||||
}
|
||||
|
||||
@test "[$GROUP] install delete by query plugin" {
|
||||
install_and_check_plugin - delete-by-query
|
||||
}
|
||||
|
||||
@test "[$GROUP] install discovery-azure plugin" {
|
||||
install_and_check_plugin discovery azure azure-core-*.jar
|
||||
}
|
||||
@ -347,10 +343,6 @@ fi
|
||||
remove_plugin discovery-gce
|
||||
}
|
||||
|
||||
@test "[$GROUP] remove delete by query plugin" {
|
||||
remove_plugin delete-by-query
|
||||
}
|
||||
|
||||
@test "[$GROUP] remove discovery-azure plugin" {
|
||||
remove_plugin discovery-azure
|
||||
}
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"delete_by_query": {
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/plugins/master/plugins-delete-by-query.html",
|
||||
"documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/master/docs-delete-by-query.html",
|
||||
"methods": ["POST"],
|
||||
"url": {
|
||||
"path": "/{index}/_delete_by_query",
|
||||
|
@ -26,7 +26,6 @@ List projects = [
|
||||
'plugins:analysis-phonetic',
|
||||
'plugins:analysis-smartcn',
|
||||
'plugins:analysis-stempel',
|
||||
'plugins:delete-by-query',
|
||||
'plugins:discovery-azure',
|
||||
'plugins:discovery-ec2',
|
||||
'plugins:discovery-gce',
|
||||
|
Loading…
x
Reference in New Issue
Block a user