Add delete-by-query plugin

The delete by query plugin adds support for deleting all of the documents (from one or more indices) which match the specified query. It is a replacement for the problematic delete-by-query functionality which has been removed from Elasticsearch core in 2.0. Internally, it uses the Scan/Scroll and Bulk APIs to delete documents in an efficient and safe manner. It is slower than the old delete-by-query functionality, but fixes the problems with the previous implementation.

Closes #7052
This commit is contained in:
Tanguy Leroux 2015-06-05 18:34:15 +02:00
parent 0434ecfb03
commit ba3540675a
19 changed files with 2651 additions and 0 deletions

View File

@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to Elasticsearch under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. ElasticSearch licenses this file to you
under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>elasticsearch-delete-by-query</artifactId>
<packaging>jar</packaging>
<name>Elasticsearch Delete By Query plugin</name>
<description>The Delete By Query plugin allows to delete documents in Elasticsearch with a single query.</description>
<parent>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-plugin</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<properties>
<tests.ifNoTests>warn</tests.ifNoTests>
<tests.rest.suite>delete_by_query</tests.rest.suite>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,70 @@
{
"delete_by_query": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/docs-delete-by-query.html",
"methods": ["DELETE"],
"url": {
"path": "/{index}/_query",
"paths": ["/{index}/_query", "/{index}/{type}/_query"],
"parts": {
"index": {
"type" : "list",
"required": true,
"description" : "A comma-separated list of indices to restrict the operation; use `_all` to perform the operation on all indices"
},
"type": {
"type" : "list",
"description" : "A comma-separated list of types to restrict the operation"
}
},
"params": {
"analyzer": {
"type" : "string",
"description" : "The analyzer to use for the query string"
},
"default_operator": {
"type" : "enum",
"options" : ["AND","OR"],
"default" : "OR",
"description" : "The default operator for query string query (AND or OR)"
},
"df": {
"type" : "string",
"description" : "The field to use as default where no field prefix is given in the query string"
},
"ignore_unavailable": {
"type" : "boolean",
"description" : "Whether specified concrete indices should be ignored when unavailable (missing or closed)"
},
"allow_no_indices": {
"type" : "boolean",
"description" : "Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
},
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
},
"q": {
"type" : "string",
"description" : "Query in the Lucene query string syntax"
},
"routing": {
"type" : "string",
"description" : "Specific routing value"
},
"source": {
"type" : "string",
"description" : "The URL-encoded query definition (instead of using the request body)"
},
"timeout": {
"type" : "time",
"description" : "Explicit operation timeout"
}
}
},
"body": {
"description" : "A query to restrict the operation specified with the Query DSL"
}
}
}

View File

@ -0,0 +1,42 @@
---
"Basic delete_by_query":
- do:
index:
index: test_1
type: test
id: 1
body: { foo: bar }
- do:
index:
index: test_1
type: test
id: 2
body: { foo: baz }
- do:
index:
index: test_1
type: test
id: 3
body: { foo: foo }
- do:
indices.refresh: {}
- do:
delete_by_query:
index: test_1
body:
query:
match:
foo: bar
- do:
indices.refresh: {}
- do:
count:
index: test_1
- match: { count: 2 }

View File

@ -0,0 +1,34 @@
<?xml version="1.0"?>
<!-- Licensed to ElasticSearch under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. ElasticSearch licenses this file to you
under the Apache License, Version 2.0 (the "License"); you may not use this
file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License. -->
<assembly>
<id>plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<exclude>org.elasticsearch:elasticsearch</exclude>
</excludes>
</dependencySet>
<dependencySet>
<outputDirectory>/</outputDirectory>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.ElasticsearchClient;
public class DeleteByQueryAction extends Action<DeleteByQueryRequest, DeleteByQueryResponse, DeleteByQueryRequestBuilder> {
public static final DeleteByQueryAction INSTANCE = new DeleteByQueryAction();
public static final String NAME = "indices:data/write/delete/by_query";
private DeleteByQueryAction() {
super(NAME);
}
@Override
public DeleteByQueryResponse newResponse() {
return new DeleteByQueryResponse();
}
@Override
public DeleteByQueryRequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new DeleteByQueryRequestBuilder(client, this);
}
}

View File

@ -0,0 +1,260 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.search.Scroll;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.search.Scroll.readScroll;
public class DeleteByQueryRequest extends ActionRequest<DeleteByQueryRequest> implements IndicesRequest.Replaceable {
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false);
private String[] types = Strings.EMPTY_ARRAY;
private BytesReference source;
private String routing;
private int size = 0;
private Scroll scroll = new Scroll(TimeValue.timeValueMinutes(10));
private TimeValue timeout;
public DeleteByQueryRequest() {
}
/**
* Constructs a new delete by query request to run against the provided indices. No indices means
* it will run against all indices.
*/
public DeleteByQueryRequest(String... indices) {
this.indices = indices;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (source == null) {
validationException = addValidationError("source is missing", validationException);
}
return validationException;
}
@Override
public String[] indices() {
return this.indices;
}
@Override
public DeleteByQueryRequest indices(String... indices) {
this.indices = indices;
return this;
}
@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}
public DeleteByQueryRequest indicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions == null) {
throw new IllegalArgumentException("IndicesOptions must not be null");
}
this.indicesOptions = indicesOptions;
return this;
}
public String[] types() {
return this.types;
}
public DeleteByQueryRequest types(String... types) {
this.types = types;
return this;
}
public BytesReference source() {
return source;
}
public DeleteByQueryRequest source(QuerySourceBuilder sourceBuilder) {
this.source = sourceBuilder.buildAsBytes(Requests.CONTENT_TYPE);
return this;
}
public DeleteByQueryRequest source(Map querySource) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE);
builder.map(querySource);
return source(builder);
} catch (IOException e) {
throw new ElasticsearchGenerationException("Failed to generate [" + querySource + "]", e);
}
}
public DeleteByQueryRequest source(XContentBuilder builder) {
this.source = builder.bytes();
return this;
}
public DeleteByQueryRequest source(String querySource) {
this.source = new BytesArray(querySource);
return this;
}
public DeleteByQueryRequest source(byte[] querySource) {
return source(querySource, 0, querySource.length);
}
public DeleteByQueryRequest source(byte[] querySource, int offset, int length) {
return source(new BytesArray(querySource, offset, length));
}
public DeleteByQueryRequest source(BytesReference querySource) {
this.source = querySource;
return this;
}
public String routing() {
return this.routing;
}
public DeleteByQueryRequest routing(String routing) {
this.routing = routing;
return this;
}
public DeleteByQueryRequest routing(String... routings) {
this.routing = Strings.arrayToCommaDelimitedString(routings);
return this;
}
public DeleteByQueryRequest size(int size) {
if (size < 0) {
throw new IllegalArgumentException("size must be greater than zero");
}
this.size = size;
return this;
}
public int size() {
return size;
}
public Scroll scroll() {
return scroll;
}
public DeleteByQueryRequest scroll(Scroll scroll) {
this.scroll = scroll;
return this;
}
public DeleteByQueryRequest scroll(TimeValue keepAlive) {
return scroll(new Scroll(keepAlive));
}
public DeleteByQueryRequest scroll(String keepAlive) {
return scroll(new Scroll(TimeValue.parseTimeValue(keepAlive, null, getClass().getSimpleName() + ".keepAlive")));
}
public TimeValue timeout() {
return timeout;
}
public DeleteByQueryRequest timeout(TimeValue timeout) {
if (timeout == null) {
throw new IllegalArgumentException("timeout must not be null");
}
this.timeout = timeout;
return this;
}
public DeleteByQueryRequest timeout(String timeout) {
timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout"));
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
types = in.readStringArray();
source = in.readBytesReference();
routing = in.readOptionalString();
size = in.readVInt();
if (in.readBoolean()) {
scroll = readScroll(in);
}
if (in.readBoolean()) {
timeout = TimeValue.readTimeValue(in);
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
out.writeStringArray(types);
out.writeBytesReference(source);
out.writeOptionalString(routing);
out.writeVInt(size);
out.writeOptionalStreamable(scroll);
out.writeOptionalStreamable(timeout);
}
@Override
public String toString() {
String sSource = "_na_";
try {
sSource = XContentHelper.convertToJson(source, false);
} catch (Exception e) {
// ignore
}
return "delete-by-query [" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], source[" + sSource + "]";
}
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import java.util.Map;
public class DeleteByQueryRequestBuilder extends ActionRequestBuilder<DeleteByQueryRequest, DeleteByQueryResponse, DeleteByQueryRequestBuilder> {
private QuerySourceBuilder sourceBuilder;
public DeleteByQueryRequestBuilder(ElasticsearchClient client, DeleteByQueryAction action) {
super(client, action, new DeleteByQueryRequest());
}
public DeleteByQueryRequestBuilder setIndices(String... indices) {
request.indices(indices);
return this;
}
/**
* Specifies what type of requested indices to ignore and wildcard indices expressions.
* <p/>
* For example indices that don't exist.
*/
public DeleteByQueryRequestBuilder setIndicesOptions(IndicesOptions options) {
request.indicesOptions(options);
return this;
}
/**
* The query used to delete documents.
*
* @see org.elasticsearch.index.query.QueryBuilders
*/
public DeleteByQueryRequestBuilder setQuery(QueryBuilder queryBuilder) {
sourceBuilder().setQuery(queryBuilder);
return this;
}
/**
* The query binary used to delete documents.
*/
public DeleteByQueryRequestBuilder setQuery(BytesReference queryBinary) {
sourceBuilder().setQuery(queryBinary);
return this;
}
/**
* Constructs a new builder with a raw search query.
*/
public DeleteByQueryRequestBuilder setQuery(XContentBuilder query) {
return setQuery(query.bytes());
}
/**
* A comma separated list of routing values to control the shards the action will be executed on.
*/
public DeleteByQueryRequestBuilder setRouting(String routing) {
request.routing(routing);
return this;
}
/**
* The routing values to control the shards that the action will be executed on.
*/
public DeleteByQueryRequestBuilder setRouting(String... routing) {
request.routing(routing);
return this;
}
/**
* The source to execute. It is preferable to use either {@link #setSource(byte[])}
* or {@link #setQuery(QueryBuilder)}.
*/
public DeleteByQueryRequestBuilder setSource(String source) {
request().source(source);
return this;
}
/**
* The source to execute in the form of a map.
*/
public DeleteByQueryRequestBuilder setSource(Map<String, Object> source) {
request().source(source);
return this;
}
/**
* The source to execute in the form of a builder.
*/
public DeleteByQueryRequestBuilder setSource(XContentBuilder builder) {
request().source(builder);
return this;
}
/**
* The source to execute.
*/
public DeleteByQueryRequestBuilder setSource(byte[] source) {
request().source(source);
return this;
}
/**
* The source to execute.
*/
public DeleteByQueryRequestBuilder setSource(BytesReference source) {
request().source(source);
return this;
}
/**
* An optional timeout to control how long the delete by query is allowed to take.
*/
public DeleteByQueryRequestBuilder setTimeout(TimeValue timeout) {
request.timeout(timeout);
return this;
}
/**
* An optional timeout to control how long the delete by query is allowed to take.
*/
public DeleteByQueryRequestBuilder setTimeout(String timeout) {
request.timeout(timeout);
return this;
}
/**
* The types of documents the query will run against. Defaults to all types.
*/
public DeleteByQueryRequestBuilder setTypes(String... types) {
request.types(types);
return this;
}
@Override
public ListenableActionFuture<DeleteByQueryResponse> execute() {
if (sourceBuilder != null) {
request.source(sourceBuilder);
}
return super.execute();
}
private QuerySourceBuilder sourceBuilder() {
if (sourceBuilder == null) {
sourceBuilder = new QuerySourceBuilder();
}
return sourceBuilder;
}
}

View File

@ -0,0 +1,201 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
/**
* Delete by query response
*/
public class DeleteByQueryResponse extends ActionResponse implements ToXContent {
private long tookInMillis;
private boolean timedOut = false;
private long found;
private long deleted;
private long missing;
private long failed;
private IndexDeleteByQueryResponse[] indices = IndexDeleteByQueryResponse.EMPTY_ARRAY;
private ShardOperationFailedException[] shardFailures = ShardSearchFailure.EMPTY_ARRAY;
DeleteByQueryResponse() {
}
DeleteByQueryResponse(long tookInMillis, boolean timedOut, long found, long deleted, long missing, long failed, IndexDeleteByQueryResponse[] indices, ShardOperationFailedException[] shardFailures) {
this.tookInMillis = tookInMillis;
this.timedOut = timedOut;
this.found = found;
this.deleted = deleted;
this.missing = missing;
this.failed = failed;
this.indices = indices;
this.shardFailures = shardFailures;
}
/**
* The responses from all the different indices.
*/
public IndexDeleteByQueryResponse[] getIndices() {
return indices;
}
/**
* The response of a specific index.
*/
public IndexDeleteByQueryResponse getIndex(String index) {
if (index == null) {
return null;
}
for (IndexDeleteByQueryResponse i : indices) {
if (index.equals(i.getIndex())) {
return i;
}
}
return null;
}
public TimeValue getTook() {
return new TimeValue(tookInMillis);
}
public long getTookInMillis() {
return tookInMillis;
}
public boolean isTimedOut() {
return this.timedOut;
}
public long getTotalFound() {
return found;
}
public long getTotalDeleted() {
return deleted;
}
public long getTotalMissing() {
return missing;
}
public long getTotalFailed() {
return failed;
}
public ShardOperationFailedException[] getShardFailures() {
return shardFailures;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
tookInMillis = in.readVLong();
timedOut = in.readBoolean();
found = in.readVLong();
deleted = in.readVLong();
missing = in.readVLong();
failed = in.readVLong();
int size = in.readVInt();
indices = new IndexDeleteByQueryResponse[size];
for (int i = 0; i < size; i++) {
IndexDeleteByQueryResponse index = new IndexDeleteByQueryResponse();
index.readFrom(in);
indices[i] = index;
}
size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVLong(tookInMillis);
out.writeBoolean(timedOut);
out.writeVLong(found);
out.writeVLong(deleted);
out.writeVLong(missing);
out.writeVLong(failed);
out.writeVInt(indices.length);
for (IndexDeleteByQueryResponse indexResponse : indices) {
indexResponse.writeTo(out);
}
out.writeVInt(shardFailures.length);
for (ShardOperationFailedException shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}
}
static final class Fields {
static final XContentBuilderString TOOK = new XContentBuilderString("took");
static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out");
static final XContentBuilderString INDICES = new XContentBuilderString("_indices");
static final XContentBuilderString FAILURES = new XContentBuilderString("failures");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.TOOK, tookInMillis);
builder.field(Fields.TIMED_OUT, timedOut);
builder.startObject(Fields.INDICES);
IndexDeleteByQueryResponse all = new IndexDeleteByQueryResponse("_all", found, deleted, missing, failed);
all.toXContent(builder, params);
for (IndexDeleteByQueryResponse indexResponse : indices) {
indexResponse.toXContent(builder, params);
}
builder.endObject();
builder.startArray(Fields.FAILURES);
if (shardFailures != null) {
for (ShardOperationFailedException shardFailure : shardFailures) {
builder.startObject();
shardFailure.toXContent(builder, params);
builder.endObject();
}
}
builder.endArray();
return builder;
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
* Delete by query response executed on a specific index.
*/
public class IndexDeleteByQueryResponse extends ActionResponse implements ToXContent {
public static final IndexDeleteByQueryResponse[] EMPTY_ARRAY = new IndexDeleteByQueryResponse[0];
private String index;
private long found = 0L;
private long deleted = 0L;
private long missing = 0L;
private long failed = 0L;
IndexDeleteByQueryResponse() {
}
IndexDeleteByQueryResponse(String index) {
this.index = index;
}
/**
* Instantiates an IndexDeleteByQueryResponse with given values for counters. Counters should not be negative.
*/
public IndexDeleteByQueryResponse(String index, long found, long deleted, long missing, long failed) {
this(index);
incrementFound(found);
incrementDeleted(deleted);
incrementMissing(missing);
incrementFailed(failed);
}
public String getIndex() {
return this.index;
}
public long getFound() {
return found;
}
public void incrementFound() {
incrementFound(1L);
}
public void incrementFound(long delta) {
assert (found + delta >= 0) : "counter 'found' cannot be negative";
this.found = found + delta;
}
public long getDeleted() {
return deleted;
}
public void incrementDeleted() {
incrementDeleted(1L);
}
public void incrementDeleted(long delta) {
assert (deleted + delta >= 0) : "counter 'deleted' cannot be negative";
this.deleted = deleted + delta;
}
public long getMissing() {
return missing;
}
public void incrementMissing() {
incrementMissing(1L);
}
public void incrementMissing(long delta) {
assert (missing + delta >= 0) : "counter 'missing' cannot be negative";
this.missing = missing + delta;
}
public long getFailed() {
return failed;
}
public void incrementFailed() {
incrementFailed(1L);
}
public void incrementFailed(long delta) {
assert (failed + delta >= 0) : "counter 'failed' cannot be negative";
this.failed = failed + delta;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readString();
found = in.readVLong();
deleted = in.readVLong();
missing = in.readVLong();
failed = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
out.writeVLong(found);
out.writeVLong(deleted);
out.writeVLong(missing);
out.writeVLong(failed);
}
static final class Fields {
static final XContentBuilderString FOUND = new XContentBuilderString("found");
static final XContentBuilderString DELETED = new XContentBuilderString("deleted");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
static final XContentBuilderString FAILED = new XContentBuilderString("failed");
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(index);
builder.field(Fields.FOUND, found);
builder.field(Fields.DELETED, deleted);
builder.field(Fields.MISSING, missing);
builder.field(Fields.FAILED, failed);
builder.endObject();
return builder;
}
}

View File

@ -0,0 +1,328 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Delete-By-Query implementation that uses efficient scrolling and bulks deletions to delete large set of documents.
*/
public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteByQueryRequest, DeleteByQueryResponse> {
private final TransportSearchAction searchAction;
private final TransportSearchScrollAction scrollAction;
private final Client client;
@Inject
protected TransportDeleteByQueryAction(Settings settings, ThreadPool threadPool, Client client,
TransportSearchAction transportSearchAction,
TransportSearchScrollAction transportSearchScrollAction,
TransportService transportService, ActionFilters actionFilters) {
super(settings, DeleteByQueryAction.NAME, threadPool, transportService, actionFilters, DeleteByQueryRequest.class);
this.searchAction = transportSearchAction;
this.scrollAction = transportSearchScrollAction;
this.client = client;
}
@Override
protected void doExecute(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) {
new AsyncDeleteByQueryAction(request, listener).start();
}
class AsyncDeleteByQueryAction {
private final DeleteByQueryRequest request;
private final ActionListener<DeleteByQueryResponse> listener;
private final long startTime;
private final AtomicBoolean timedOut;
private final AtomicLong total;
private volatile ShardOperationFailedException[] shardFailures;
private final Map<String, IndexDeleteByQueryResponse> results;
AsyncDeleteByQueryAction(DeleteByQueryRequest request, ActionListener<DeleteByQueryResponse> listener) {
this.request = request;
this.listener = listener;
this.startTime = threadPool.estimatedTimeInMillis();
this.timedOut = new AtomicBoolean(false);
this.total = new AtomicLong(0L);
this.shardFailures = ShardSearchFailure.EMPTY_ARRAY;
this.results = new HashMap<>();
}
public void start() {
executeScan();
}
void executeScan() {
try {
final SearchRequest scanRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions(request.indicesOptions());
scanRequest.searchType(SearchType.SCAN).scroll(request.scroll());
if (request.routing() != null) {
scanRequest.routing(request.routing());
}
SearchSourceBuilder source = new SearchSourceBuilder().query(request.source()).fields("_routing", "_parent").fetchSource(false).version(true);
if (request.size() > 0) {
source.size(request.size());
}
if (request.timeout() != null) {
source.timeout(request.timeout());
}
scanRequest.source(source);
logger.trace("executing scan request");
searchAction.execute(scanRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {
long hits = searchResponse.getHits().getTotalHits();
logger.trace("scan request executed: found [{}] document(s) to delete", hits);
addShardFailures(searchResponse.getShardFailures());
if (hits == 0) {
listener.onResponse(buildResponse());
return;
}
total.set(hits);
logger.trace("start scrolling [{}] document(s)", hits);
executeScroll(searchResponse.getScrollId());
}
@Override
public void onFailure(Throwable e) {
listener.onFailure(e);
}
});
} catch (Throwable t) {
logger.error("unable to execute the initial scan request of delete by query", t);
listener.onFailure(t);
}
}
void executeScroll(final String scrollId) {
try {
logger.trace("executing scroll request [{}]", scrollId);
scrollAction.execute(new SearchScrollRequest(scrollId).scroll(request.scroll()), new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse scrollResponse) {
final SearchHit[] docs = scrollResponse.getHits().getHits();
final String nextScrollId = scrollResponse.getScrollId();
addShardFailures(scrollResponse.getShardFailures());
if (logger.isTraceEnabled()) {
logger.trace("scroll request [{}] executed: [{}] document(s) returned", scrollId, docs.length);
}
if ((docs.length == 0) || (nextScrollId == null)) {
logger.trace("scrolling documents terminated");
finishHim(scrollId, false, null);
return;
}
if (hasTimedOut()) {
logger.trace("scrolling documents timed out");
finishHim(scrollId, true, null);
return;
}
// Delete the scrolled documents using the Bulk API
BulkRequest bulkRequest = new BulkRequest();
for (SearchHit doc : docs) {
DeleteRequest delete = new DeleteRequest(doc.index(), doc.type(), doc.id()).version(doc.version());
SearchHitField routing = doc.field("_routing");
if (routing != null) {
delete.routing((String) routing.value());
}
SearchHitField parent = doc.field("_parent");
if (parent != null) {
delete.parent((String) parent.value());
}
bulkRequest.add(delete);
}
logger.trace("executing bulk request with [{}] deletions", bulkRequest.numberOfActions());
client.bulk(bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
onBulkResponse(nextScrollId, bulkResponse);
}
@Override
public void onFailure(Throwable e) {
onBulkFailure(nextScrollId, docs, e);
}
});
}
@Override
public void onFailure(Throwable e) {
logger.error("scroll request [{}] failed, scrolling document(s) is stopped", e, scrollId);
finishHim(scrollId, hasTimedOut(), e);
}
});
} catch (Throwable t) {
logger.error("unable to execute scroll request [{}]", t, scrollId);
finishHim(scrollId, false, t);
}
}
void onBulkResponse(String scrollId, BulkResponse bulkResponse) {
try {
for (BulkItemResponse item : bulkResponse.getItems()) {
IndexDeleteByQueryResponse indexCounter = results.get(item.getIndex());
if (indexCounter == null) {
indexCounter = new IndexDeleteByQueryResponse(item.getIndex());
}
indexCounter.incrementFound();
if (item.isFailed()) {
indexCounter.incrementFailed();
} else {
indexCounter.incrementDeleted();
DeleteResponse delete = item.getResponse();
if (!delete.isFound()) {
indexCounter.incrementMissing();
}
}
results.put(item.getIndex(), indexCounter);
}
logger.trace("scrolling next batch of document(s) with scroll id [{}]", scrollId);
executeScroll(scrollId);
} catch (Throwable t) {
logger.error("unable to process bulk response", t);
finishHim(scrollId, false, t);
}
}
void onBulkFailure(String scrollId, SearchHit[] docs, Throwable failure) {
try {
logger.trace("execution of scroll request failed: {}", failure.getMessage());
for (SearchHit doc : docs) {
IndexDeleteByQueryResponse indexCounter = results.get(doc.index());
if (indexCounter == null) {
indexCounter = new IndexDeleteByQueryResponse(doc.index());
}
indexCounter.incrementFound();
indexCounter.incrementFailed();
results.put(doc.getIndex(), indexCounter);
}
logger.trace("scrolling document terminated due to scroll request failure [{}]", scrollId);
finishHim(scrollId, hasTimedOut(), failure);
} catch (Throwable t) {
logger.error("unable to process bulk failure", t);
finishHim(scrollId, false, t);
}
}
void finishHim(final String scrollId, boolean scrollTimedOut, Throwable failure) {
try {
if (scrollTimedOut) {
logger.trace("delete-by-query response marked as timed out");
timedOut.set(true);
}
if (Strings.hasText(scrollId)) {
client.prepareClearScroll().addScrollId(scrollId).execute(new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
logger.trace("scroll id [{}] cleared", scrollId);
}
@Override
public void onFailure(Throwable e) {
logger.warn("unable to clear scroll id [{}]: {}", scrollId, e.getMessage());
}
});
}
if (failure != null) {
logger.trace("scrolling document(s) terminated with failures: {}", failure.getMessage());
listener.onFailure(failure);
} else {
logger.trace("scrolling document(s) terminated with success");
listener.onResponse(buildResponse());
}
} catch (Throwable t) {
listener.onFailure(t);
}
}
boolean hasTimedOut() {
return request.timeout() != null && (threadPool.estimatedTimeInMillis() >= (startTime + request.timeout().millis()));
}
void addShardFailure(ShardOperationFailedException failure) {
addShardFailures(new ShardOperationFailedException[]{failure});
}
void addShardFailures(ShardOperationFailedException[] failures) {
if (!CollectionUtils.isEmpty(failures)) {
ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length];
System.arraycopy(shardFailures, 0, duplicates, 0, shardFailures.length);
System.arraycopy(failures, 0, duplicates, shardFailures.length, failures.length);
shardFailures = ExceptionsHelper.groupBy(duplicates);
}
}
protected DeleteByQueryResponse buildResponse() {
long took = threadPool.estimatedTimeInMillis() - startTime;
long deleted = 0;
long missing = 0;
long failed = 0;
// Calculates the total number deleted/failed/missing documents
for (IndexDeleteByQueryResponse result : results.values()) {
deleted = deleted + result.getDeleted();
missing = missing + result.getMissing();
failed = failed + result.getFailed();
}
IndexDeleteByQueryResponse[] indices = results.values().toArray(new IndexDeleteByQueryResponse[results.size()]);
return new DeleteByQueryResponse(took, timedOut.get(), total.get(), deleted, missing, failed, indices, shardFailures);
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.deletebyquery;
import org.elasticsearch.action.ActionModule;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.PreProcessModule;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
import org.elasticsearch.rest.action.deletebyquery.RestDeleteByQueryAction;
import org.elasticsearch.rest.RestModule;
public class DeleteByQueryModule extends AbstractModule implements PreProcessModule {
@Override
public void processModule(Module module) {
if (module instanceof RestModule) {
RestModule restModule = (RestModule) module;
restModule.addRestAction(RestDeleteByQueryAction.class);
}
if (module instanceof ActionModule) {
ActionModule actionModule = (ActionModule) module;
actionModule.registerAction(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class);
}
}
@Override
protected void configure() {
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.deletebyquery;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.AbstractPlugin;
import java.util.Arrays;
import java.util.Collection;
public class DeleteByQueryPlugin extends AbstractPlugin {
public static final String NAME = "delete-by-query";
@Override
public String name() {
return NAME;
}
@Override
public String description() {
return "Elasticsearch Delete-By-Query Plugin";
}
@Override
public Collection<Module> modules(Settings settings) {
return Arrays.asList((Module) new DeleteByQueryModule());
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.deletebyquery;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestToXContentListener;
import static org.elasticsearch.action.deletebyquery.DeleteByQueryAction.INSTANCE;
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
public class RestDeleteByQueryAction extends BaseRestHandler {
@Inject
public RestDeleteByQueryAction(Settings settings, RestController controller, Client client) {
super(settings, controller, client);
controller.registerHandler(DELETE, "/{index}/_query", this);
controller.registerHandler(DELETE, "/{index}/{type}/_query", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) {
DeleteByQueryRequest delete = new DeleteByQueryRequest(Strings.splitStringByCommaToArray(request.param("index")));
delete.indicesOptions(IndicesOptions.fromRequest(request, delete.indicesOptions()));
delete.routing(request.param("routing"));
if (request.hasParam("timeout")) {
delete.timeout(request.paramAsTime("timeout", null));
}
if (request.hasContent()) {
delete.source(request.content());
} else {
String source = request.param("source");
if (source != null) {
delete.source(source);
} else {
QuerySourceBuilder querySourceBuilder = RestActions.parseQuerySource(request);
if (querySourceBuilder != null) {
delete.source(querySourceBuilder);
}
}
}
delete.types(Strings.splitStringByCommaToArray(request.param("type")));
client.execute(INSTANCE, delete, new RestToXContentListener<DeleteByQueryResponse>(channel));
}
}

View File

@ -0,0 +1,3 @@
plugin=org.elasticsearch.plugin.deletebyquery.DeleteByQueryPlugin
version=${project.version}
lucene=${lucene.version}

View File

@ -0,0 +1,166 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.test.VersionUtils;
import org.junit.Test;
import static org.hamcrest.Matchers.equalTo;
public class IndexDeleteByQueryResponseTests extends ElasticsearchTestCase {
@Test
public void testIncrements() {
String indexName = randomAsciiOfLength(5);
// Use randomInt to prevent range overflow
long found = Math.abs(randomInt());
long deleted = Math.abs(randomInt());
long missing = Math.abs(randomInt());
long failed = Math.abs(randomInt());
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse(indexName, found, deleted, missing, failed);
assertThat(response.getIndex(), equalTo(indexName));
assertThat(response.getFound(), equalTo(found));
assertThat(response.getDeleted(), equalTo(deleted));
assertThat(response.getMissing(), equalTo(missing));
assertThat(response.getFailed(), equalTo(failed));
response.incrementFound();
response.incrementDeleted();
response.incrementMissing();
response.incrementFailed();
assertThat(response.getFound(), equalTo(found + 1));
assertThat(response.getDeleted(), equalTo(deleted + 1));
assertThat(response.getMissing(), equalTo(missing + 1));
assertThat(response.getFailed(), equalTo(failed + 1));
// Use randomInt to prevent range overflow
long inc = randomIntBetween(0, 1000);
response.incrementFound(inc);
response.incrementDeleted(inc);
response.incrementMissing(inc);
response.incrementFailed(inc);
assertThat(response.getFound(), equalTo(found + 1 + inc));
assertThat(response.getDeleted(), equalTo(deleted + 1 + inc));
assertThat(response.getMissing(), equalTo(missing + 1 + inc));
assertThat(response.getFailed(), equalTo(failed + 1 + inc));
}
@Test
public void testNegativeCounters() {
try {
new IndexDeleteByQueryResponse("index", -1L, 0L, 0L, 0L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'found' cannot be negative"), equalTo(true));
}
try {
new IndexDeleteByQueryResponse("index", 0L, -1L, 0L, 0L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'deleted' cannot be negative"), equalTo(true));
}
try {
new IndexDeleteByQueryResponse("index", 0L, 0L, -1L, 0L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'missing' cannot be negative"), equalTo(true));
}
try {
new IndexDeleteByQueryResponse("index", 0L, 0L, 0L, -1L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'failed' cannot be negative"), equalTo(true));
}
}
@Test
public void testNegativeIncrements() {
try {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
response.incrementFound(-10L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'found' cannot be negative"), equalTo(true));
}
try {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
response.incrementDeleted(-10L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'deleted' cannot be negative"), equalTo(true));
}
try {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
response.incrementMissing(-10L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'missing' cannot be negative"), equalTo(true));
}
try {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse();
response.incrementFailed(-1L);
fail("should have thrown an assertion error concerning the negative counter");
} catch (AssertionError e) {
assertThat("message contains error about a negative counter: " + e.getMessage(),
e.getMessage().contains("counter 'failed' cannot be negative"), equalTo(true));
}
}
@Test
public void testSerialization() throws Exception {
IndexDeleteByQueryResponse response = new IndexDeleteByQueryResponse(randomAsciiOfLength(5), Math.abs(randomLong()), Math.abs(randomLong()), Math.abs(randomLong()), Math.abs(randomLong()));
Version testVersion = VersionUtils.randomVersionBetween(random(), Version.CURRENT.minimumCompatibilityVersion(), Version.CURRENT);
BytesStreamOutput output = new BytesStreamOutput();
output.setVersion(testVersion);
response.writeTo(output);
StreamInput streamInput = StreamInput.wrap(output.bytes());
streamInput.setVersion(testVersion);
IndexDeleteByQueryResponse deserializedResponse = new IndexDeleteByQueryResponse();
deserializedResponse.readFrom(streamInput);
assertThat(deserializedResponse.getIndex(), equalTo(response.getIndex()));
assertThat(deserializedResponse.getFound(), equalTo(response.getFound()));
assertThat(deserializedResponse.getDeleted(), equalTo(response.getDeleted()));
assertThat(deserializedResponse.getMissing(), equalTo(response.getMissing()));
assertThat(deserializedResponse.getFailed(), equalTo(response.getFailed()));
}
}

View File

@ -0,0 +1,429 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.deletebyquery;
import com.google.common.base.Predicate;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import org.junit.Test;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
public class TransportDeleteByQueryActionTests extends ElasticsearchSingleNodeTest {
@Test
public void testExecuteScanFailsOnMissingIndex() {
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("none");
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScan();
waitForCompletion("scan request should fail on missing index", listener);
assertFailure(listener, "no such index");
assertSearchContextsClosed();
}
@Test
public void testExecuteScanFailsOnMalformedQuery() {
createIndex("test");
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test").source("{...}");
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScan();
waitForCompletion("scan request should fail on malformed query", listener);
assertFailure(listener, "all shards failed");
assertSearchContextsClosed();
}
@Test
public void testExecuteScan() {
createIndex("test");
final int numDocs = randomIntBetween(1, 200);
for (int i = 1; i <= numDocs; i++) {
client().prepareIndex("test", "type").setSource("num", i).get();
}
client().admin().indices().prepareRefresh("test").get();
assertHitCount(client().prepareCount("test").get(), numDocs);
final long limit = randomIntBetween(0, numDocs);
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test").source(boolQuery().must(rangeQuery("num").lte(limit)).buildAsBytes());
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScan();
waitForCompletion("scan request should return the exact number of documents", listener);
assertNoFailures(listener);
DeleteByQueryResponse response = listener.getResponse();
assertNotNull(response);
assertThat(response.getTotalFound(), equalTo(limit));
assertThat(response.getTotalDeleted(), equalTo(limit));
assertSearchContextsClosed();
}
@Test
public void testExecuteScrollFailsOnMissingScrollId() {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScroll(null);
waitForCompletion("scroll request should fail on missing scroll id", listener);
assertFailure(listener, "scrollId is missing");
assertSearchContextsClosed();
}
@Test
public void testExecuteScrollFailsOnMalformedScrollId() {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScroll("123");
waitForCompletion("scroll request should fail on malformed scroll id", listener);
assertFailure(listener, "Failed to decode scrollId");
assertSearchContextsClosed();
}
@Test
public void testExecuteScrollFailsOnExpiredScrollId() {
final long numDocs = randomIntBetween(1, 100);
for (int i = 1; i <= numDocs; i++) {
client().prepareIndex("test", "type").setSource("num", i).get();
}
client().admin().indices().prepareRefresh("test").get();
assertHitCount(client().prepareCount("test").get(), numDocs);
SearchResponse searchResponse = client().prepareSearch("test").setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueSeconds(10)).get();
assertThat(searchResponse.getHits().getTotalHits(), equalTo(numDocs));
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
ClearScrollResponse clearScrollResponse = client().prepareClearScroll().addScrollId(scrollId).get();
assertTrue(clearScrollResponse.isSucceeded());
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test");
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
waitForCompletion("scroll request returns zero documents on expired scroll id", listener);
assertNull(listener.getError());
assertShardFailuresContains(listener.getResponse().getShardFailures(), "No search context found");
assertSearchContextsClosed();
}
@Test
public void testExecuteScrollTimedOut() throws InterruptedException {
client().prepareIndex("test", "type").setSource("num", "1").setRefresh(true).get();
SearchResponse searchResponse = client().prepareSearch("test").setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueSeconds(10)).get();
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test").timeout(TimeValue.timeValueSeconds(1));
TestActionListener listener = new TestActionListener();
final TransportDeleteByQueryAction.AsyncDeleteByQueryAction async = newAsyncAction(delete, listener);
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
// Wait until the action timed out
return async.hasTimedOut();
}
});
async.executeScroll(searchResponse.getScrollId());
waitForCompletion("scroll request returns zero documents on expired scroll id", listener);
assertNull(listener.getError());
assertTrue(listener.getResponse().isTimedOut());
assertThat(listener.getResponse().getTotalDeleted(), equalTo(0L));
assertSearchContextsClosed();
}
@Test
public void testExecuteScrollNoDocuments() {
createIndex("test");
SearchResponse searchResponse = client().prepareSearch("test").setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueSeconds(10)).get();
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test");
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
waitForCompletion("scroll request returns zero documents", listener);
assertNull(listener.getError());
assertFalse(listener.getResponse().isTimedOut());
assertThat(listener.getResponse().getTotalFound(), equalTo(0L));
assertThat(listener.getResponse().getTotalDeleted(), equalTo(0L));
assertSearchContextsClosed();
}
@Test
public void testExecuteScroll() {
final int numDocs = randomIntBetween(1, 100);
for (int i = 1; i <= numDocs; i++) {
client().prepareIndex("test", "type").setSource("num", i).get();
}
client().admin().indices().prepareRefresh("test").get();
assertHitCount(client().prepareCount("test").get(), numDocs);
final long limit = randomIntBetween(0, numDocs);
SearchResponse searchResponse = client().prepareSearch("test").setSearchType(SearchType.SCAN)
.setScroll(TimeValue.timeValueSeconds(10))
.setQuery(boolQuery().must(rangeQuery("num").lte(limit)))
.addFields("_routing", "_parent")
.setFetchSource(false)
.setVersion(true)
.get();
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
assertThat(searchResponse.getHits().getTotalHits(), equalTo(limit));
DeleteByQueryRequest delete = new DeleteByQueryRequest().indices("test").size(100).source(boolQuery().must(rangeQuery("num").lte(limit)).buildAsBytes());
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).executeScroll(searchResponse.getScrollId());
waitForCompletion("scroll request should return all documents", listener);
assertNull(listener.getError());
assertFalse(listener.getResponse().isTimedOut());
assertThat(listener.getResponse().getTotalDeleted(), equalTo(limit));
assertSearchContextsClosed();
}
@Test
public void testOnBulkResponse() {
final int nbItems = randomIntBetween(0, 20);
long deleted = 0;
long missing = 0;
long failed = 0;
BulkItemResponse[] items = new BulkItemResponse[nbItems];
for (int i = 0; i < nbItems; i++) {
if (randomBoolean()) {
boolean delete = true;
if (rarely()) {
delete = false;
missing++;
} else {
deleted++;
}
items[i] = new BulkItemResponse(i, "delete", new DeleteResponse("test", "type", String.valueOf(i), 1, delete));
} else {
items[i] = new BulkItemResponse(i, "delete", new BulkItemResponse.Failure("test", "type", String.valueOf(i), new Throwable("item failed")));
failed++;
}
}
// We just need a valid scroll id
createIndex("test");
SearchResponse searchResponse = client().prepareSearch().setSearchType(SearchType.SCAN).setScroll(TimeValue.timeValueSeconds(10)).get();
String scrollId = searchResponse.getScrollId();
assertTrue(Strings.hasText(scrollId));
try {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).onBulkResponse(searchResponse.getScrollId(), new BulkResponse(items, 0L));
waitForCompletion("waiting for bulk response to complete", listener);
assertNoFailures(listener);
assertThat(listener.getResponse().getTotalDeleted(), equalTo(deleted));
assertThat(listener.getResponse().getTotalFailed(), equalTo(failed));
assertThat(listener.getResponse().getTotalMissing(), equalTo(missing));
} finally {
client().prepareClearScroll().addScrollId(scrollId).get();
}
}
@Test
public void testOnBulkFailureNoDocuments() {
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
newAsyncAction(delete, listener).onBulkFailure(null, new SearchHit[0], new Throwable("This is a bulk failure"));
waitForCompletion("waiting for bulk failure to complete", listener);
assertFailure(listener, "This is a bulk failure");
}
@Test
public void testOnBulkFailure() {
final int nbDocs = randomIntBetween(0, 20);
SearchHit[] docs = new SearchHit[nbDocs];
for (int i = 0; i < nbDocs; i++) {
InternalSearchHit doc = new InternalSearchHit(randomInt(), String.valueOf(i), new StringText("type"), null);
doc.shard(new SearchShardTarget("node", "test", randomInt()));
docs[i] = doc;
}
DeleteByQueryRequest delete = new DeleteByQueryRequest();
TestActionListener listener = new TestActionListener();
TransportDeleteByQueryAction.AsyncDeleteByQueryAction async = newAsyncAction(delete, listener);
async.onBulkFailure(null, docs, new Throwable("This is a bulk failure"));
waitForCompletion("waiting for bulk failure to complete", listener);
assertFailure(listener, "This is a bulk failure");
DeleteByQueryResponse response = async.buildResponse();
assertThat(response.getTotalFailed(), equalTo((long) nbDocs));
assertThat(response.getTotalDeleted(), equalTo(0L));
}
@Test
public void testFinishHim() {
TestActionListener listener = new TestActionListener();
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, false, null);
waitForCompletion("waiting for finishHim to complete with success", listener);
assertNoFailures(listener);
assertNotNull(listener.getResponse());
assertFalse(listener.getResponse().isTimedOut());
listener = new TestActionListener();
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, true, null);
waitForCompletion("waiting for finishHim to complete with timed out = true", listener);
assertNoFailures(listener);
assertNotNull(listener.getResponse());
assertTrue(listener.getResponse().isTimedOut());
listener = new TestActionListener();
newAsyncAction(new DeleteByQueryRequest(), listener).finishHim(null, false, new Throwable("Fake error"));
waitForCompletion("waiting for finishHim to complete with error", listener);
assertFailure(listener, "Fake error");
assertNull(listener.getResponse());
}
private TransportDeleteByQueryAction.AsyncDeleteByQueryAction newAsyncAction(DeleteByQueryRequest request, TestActionListener listener) {
TransportDeleteByQueryAction action = getInstanceFromNode(TransportDeleteByQueryAction.class);
assertNotNull(action);
return action.new AsyncDeleteByQueryAction(request, listener);
}
private void waitForCompletion(String testName, final TestActionListener listener) {
logger.info(" --> waiting for delete-by-query [{}] to complete", testName);
try {
awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return listener.isTerminated();
}
});
} catch (InterruptedException e) {
fail("exception when waiting for delete-by-query [" + testName + "] to complete: " + e.getMessage());
logger.error("exception when waiting for delete-by-query [{}] to complete", e, testName);
}
}
private void assertFailure(TestActionListener listener, String expectedMessage) {
Throwable t = listener.getError();
assertNotNull(t);
assertTrue(Strings.hasText(expectedMessage));
assertTrue("error message should contain [" + expectedMessage + "] but got [" + t.getMessage() + "]", t.getMessage().contains(expectedMessage));
}
private void assertNoFailures(TestActionListener listener) {
assertNull(listener.getError());
assertTrue(CollectionUtils.isEmpty(listener.getResponse().getShardFailures()));
}
private void assertSearchContextsClosed() {
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for (NodeStats nodeStat : nodesStats.getNodes()){
assertThat(nodeStat.getIndices().getSearch().getOpenContexts(), equalTo(0L));
}
}
private void assertShardFailuresContains(ShardOperationFailedException[] shardFailures, String expectedFailure) {
assertNotNull(shardFailures);
for (ShardOperationFailedException failure : shardFailures) {
if (failure.reason().contains(expectedFailure)) {
return;
}
}
fail("failed to find shard failure [" + expectedFailure + "]");
}
private class TestActionListener implements ActionListener<DeleteByQueryResponse> {
private final CountDown count = new CountDown(1);
private DeleteByQueryResponse response;
private Throwable error;
@Override
public void onResponse(DeleteByQueryResponse response) {
try {
this.response = response;
} finally {
count.countDown();
}
}
@Override
public void onFailure(Throwable e) {
try {
this.error = e;
} finally {
count.countDown();
}
}
public boolean isTerminated() {
return count.isCountedDown();
}
public DeleteByQueryResponse getResponse() {
return response;
}
public Throwable getError() {
return error;
}
}
}

View File

@ -0,0 +1,445 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.deletebyquery;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.deletebyquery.DeleteByQueryAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@Slow
@ClusterScope(scope = SUITE, transportClientRatio = 0)
public class DeleteByQueryTests extends ElasticsearchIntegrationTest {
@Test(expected = ActionRequestValidationException.class)
public void testDeleteByQueryWithNoSource() {
newDeleteByQuery().get();
fail("should have thrown a validation exception because of the missing source");
}
@Test
public void testDeleteByQueryWithNoIndices() {
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery());
delete.setIndicesOptions(IndicesOptions.fromOptions(false, true, true, false));
assertDBQResponse(delete.get(), 0L, 0l, 0l, 0l);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithOneIndex() throws Exception {
final long docs = randomIntBetween(1, 50);
for (int i = 0; i < docs; i++) {
index("test", "test", String.valueOf(i), "fields1", 1);
}
refresh();
assertHitCount(client().prepareCount("test").get(), docs);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("t*").setQuery(QueryBuilders.matchAllQuery());
assertDBQResponse(delete.get(), docs, docs, 0l, 0l);
refresh();
assertHitCount(client().prepareCount("test").get(), 0);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithMultipleIndices() throws Exception {
final int indices = randomIntBetween(2, 5);
final int docs = randomIntBetween(2, 10) * 2;
long[] candidates = new long[indices];
for (int i = 0; i < indices; i++) {
// number of documents to be deleted with the upcoming delete-by-query
// (this number differs for each index)
candidates[i] = randomIntBetween(1, docs);
for (int j = 0; j < docs; j++) {
boolean candidate = (j < candidates[i]);
index("test-" + i, "test", String.valueOf(j), "candidate", candidate);
}
}
// total number of expected deletions
long deletions = 0;
for (long i : candidates) {
deletions = deletions + i;
}
refresh();
assertHitCount(client().prepareCount().get(), docs * indices);
for (int i = 0; i < indices; i++) {
assertHitCount(client().prepareCount("test-" + i).get(), docs);
}
// Deletes all the documents with candidate=true
DeleteByQueryResponse response = newDeleteByQuery().setIndices("test-*").setQuery(QueryBuilders.termQuery("candidate", true)).get();
refresh();
// Checks that the DBQ response returns the expected number of deletions
assertDBQResponse(response, deletions, deletions, 0l, 0l);
assertNotNull(response.getIndices());
assertThat(response.getIndices().length, equalTo(indices));
for (int i = 0; i < indices; i++) {
String indexName = "test-" + i;
IndexDeleteByQueryResponse indexResponse = response.getIndex(indexName);
assertThat(indexResponse.getFound(), equalTo(candidates[i]));
assertThat(indexResponse.getDeleted(), equalTo(candidates[i]));
assertThat(indexResponse.getFailed(), equalTo(0L));
assertThat(indexResponse.getMissing(), equalTo(0L));
assertThat(indexResponse.getIndex(), equalTo(indexName));
long remaining = docs - candidates[i];
assertHitCount(client().prepareCount(indexName).get(), remaining);
}
assertHitCount(client().prepareCount().get(), (indices * docs) - deletions);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithMissingIndex() throws Exception {
client().prepareIndex("test", "test")
.setSource(jsonBuilder().startObject().field("field1", 1).endObject())
.setRefresh(true)
.get();
assertHitCount(client().prepareCount().get(), 1);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test", "missing").setQuery(QueryBuilders.matchAllQuery());
try {
delete.get();
fail("should have thrown an exception because of a missing index");
} catch (IndexMissingException e) {
// Ok
}
delete.setIndicesOptions(IndicesOptions.lenientExpandOpen());
assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l);
refresh();
assertHitCount(client().prepareCount("test").get(), 0);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithTypes() throws Exception {
final long docs = randomIntBetween(1, 50);
for (int i = 0; i < docs; i++) {
index(randomFrom("test1", "test2", "test3"), "type1", String.valueOf(i), "foo", "bar");
index(randomFrom("test1", "test2", "test3"), "type2", String.valueOf(i), "foo", "bar");
}
refresh();
assertHitCount(client().prepareCount().get(), docs * 2);
assertHitCount(client().prepareCount().setTypes("type1").get(), docs);
assertHitCount(client().prepareCount().setTypes("type2").get(), docs);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setTypes("type1").setQuery(QueryBuilders.matchAllQuery());
assertDBQResponse(delete.get(), docs, docs, 0l, 0l);
refresh();
assertHitCount(client().prepareCount().get(), docs);
assertHitCount(client().prepareCount().setTypes("type1").get(), 0);
assertHitCount(client().prepareCount().setTypes("type2").get(), docs);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithRouting() throws Exception {
assertAcked(prepareCreate("test").setSettings("number_of_shards", 2));
ensureGreen("test");
final int docs = randomIntBetween(2, 10);
logger.info("--> indexing [{}] documents with routing", docs);
for (int i = 0; i < docs; i++) {
client().prepareIndex("test", "test", String.valueOf(i)).setRouting(String.valueOf(i)).setSource("field1", 1).get();
}
refresh();
logger.info("--> counting documents with no routing, should be equal to [{}]", docs);
assertHitCount(client().prepareCount().get(), docs);
String routing = String.valueOf(randomIntBetween(2, docs));
logger.info("--> counting documents with routing [{}]", routing);
long expected = client().prepareCount().setRouting(routing).get().getCount();
logger.info("--> delete all documents with routing [{}] with a delete-by-query", routing);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setRouting(routing).setQuery(QueryBuilders.matchAllQuery());
assertDBQResponse(delete.get(), expected, expected, 0l, 0l);
refresh();
assertHitCount(client().prepareCount().get(), docs - expected);
assertSearchContextsClosed();
}
@Test
public void testDeleteByFieldQuery() throws Exception {
assertAcked(prepareCreate("test").addAlias(new Alias("alias")));
int numDocs = scaledRandomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test", "test", Integer.toString(i))
.setRouting(randomAsciiOfLengthBetween(1, 5))
.setSource("foo", "bar").get();
}
refresh();
int n = between(0, numDocs - 1);
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n))).get(), 1);
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), numDocs);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("alias").setQuery(QueryBuilders.matchQuery("_id", Integer.toString(n)));
assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l);
refresh();
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.matchAllQuery()).get(), numDocs - 1);
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryWithDateMath() throws Exception {
index("test", "type", "1", "d", "2013-01-01");
ensureGreen();
refresh();
assertHitCount(client().prepareCount("test").get(), 1);
DeleteByQueryRequestBuilder delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.rangeQuery("d").to("now-1h"));
assertDBQResponse(delete.get(), 1L, 1L, 0l, 0l);
refresh();
assertHitCount(client().prepareCount("test").get(), 0);
assertSearchContextsClosed();
}
@Test
public void testDeleteByTermQuery() throws Exception {
createIndex("test");
ensureGreen();
int numDocs = scaledRandomIntBetween(10, 50);
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs + 1];
for (int i = 0; i < numDocs; i++) {
indexRequestBuilders[i] = client().prepareIndex("test", "test", Integer.toString(i)).setSource("field", "value");
}
indexRequestBuilders[numDocs] = client().prepareIndex("test", "test", Integer.toString(numDocs)).setSource("field", "other_value");
indexRandom(true, indexRequestBuilders);
SearchResponse searchResponse = client().prepareSearch("test").get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo((long) numDocs + 1));
DeleteByQueryResponse delete = newDeleteByQuery().setIndices("test").setQuery(QueryBuilders.termQuery("field", "value")).get();
assertDBQResponse(delete, numDocs, numDocs, 0l, 0l);
refresh();
searchResponse = client().prepareSearch("test").get();
assertNoFailures(searchResponse);
assertThat(searchResponse.getHits().totalHits(), equalTo(1l));
assertSearchContextsClosed();
}
@Test
public void testConcurrentDeleteByQueriesOnDifferentDocs() throws InterruptedException {
createIndex("test");
ensureGreen();
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 5)];
final long docs = randomIntBetween(1, 50);
for (int i = 0; i < docs; i++) {
for (int j = 0; j < threads.length; j++) {
index("test", "test", String.valueOf(i * 10 + j), "field", j);
}
}
refresh();
assertHitCount(client().prepareCount("test").get(), docs * threads.length);
final CountDownLatch start = new CountDownLatch(1);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
for (int i = 0; i < threads.length; i++) {
final int threadNum = i;
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.termQuery("field", threadNum)).get(), docs);
Runnable r = new Runnable() {
@Override
public void run() {
try {
start.await();
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.termQuery("field", threadNum)).get();
assertDBQResponse(rsp, docs, docs, 0L, 0L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
exceptionHolder.set(e);
Thread.currentThread().interrupt();
}
}
};
threads[i] = new Thread(r);
threads[i].start();
}
start.countDown();
for (Thread thread : threads) {
thread.join();
}
Throwable assertionError = exceptionHolder.get();
if (assertionError != null) {
assertionError.printStackTrace();
}
assertThat(assertionError + " should be null", assertionError, nullValue());
refresh();
for (int i = 0; i < threads.length; i++) {
assertHitCount(client().prepareCount("test").setQuery(QueryBuilders.termQuery("field", i)).get(), 0);
}
assertSearchContextsClosed();
}
@Test
public void testConcurrentDeleteByQueriesOnSameDocs() throws InterruptedException {
assertAcked(prepareCreate("test").setSettings(Settings.settingsBuilder().put("index.refresh_interval", -1)));
ensureGreen();
final long docs = randomIntBetween(50, 100);
for (int i = 0; i < docs; i++) {
index("test", "test", String.valueOf(i), "foo", "bar");
}
refresh();
assertHitCount(client().prepareCount("test").get(), docs);
final Thread[] threads = new Thread[scaledRandomIntBetween(2, 9)];
final CountDownLatch start = new CountDownLatch(1);
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
final MatchQueryBuilder query = QueryBuilders.matchQuery("foo", "bar");
final AtomicLong deleted = new AtomicLong(0);
for (int i = 0; i < threads.length; i++) {
assertHitCount(client().prepareCount("test").setQuery(query).get(), docs);
Runnable r = new Runnable() {
@Override
public void run() {
try {
start.await();
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(query).get();
deleted.addAndGet(rsp.getTotalDeleted());
assertThat(rsp.getTotalFound(), equalTo(docs));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Throwable e) {
exceptionHolder.set(e);
Thread.currentThread().interrupt();
}
}
};
threads[i] = new Thread(r);
threads[i].start();
}
start.countDown();
for (Thread thread : threads) {
thread.join();
}
refresh();
Throwable assertionError = exceptionHolder.get();
if (assertionError != null) {
assertionError.printStackTrace();
}
assertThat(assertionError + " should be null", assertionError, nullValue());
assertHitCount(client().prepareCount("test").get(), 0L);
assertThat(deleted.get(), equalTo(docs));
assertSearchContextsClosed();
}
@Test
public void testDeleteByQueryOnReadOnlyIndex() throws InterruptedException {
createIndex("test");
ensureGreen();
final long docs = randomIntBetween(1, 50);
for (int i = 0; i < docs; i++) {
index("test", "test", String.valueOf(i), "field", 1);
}
refresh();
assertHitCount(client().prepareCount("test").get(), docs);
try {
enableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
DeleteByQueryResponse rsp = newDeleteByQuery().setQuery(QueryBuilders.matchAllQuery()).get();
assertDBQResponse(rsp, docs, 0L, docs, 0L);
} finally {
disableIndexBlock("test", IndexMetaData.SETTING_READ_ONLY);
}
assertHitCount(client().prepareCount("test").get(), docs);
assertSearchContextsClosed();
}
private DeleteByQueryRequestBuilder newDeleteByQuery() {
return new DeleteByQueryRequestBuilder(client(), DeleteByQueryAction.INSTANCE);
}
private void assertDBQResponse(DeleteByQueryResponse response, long found, long deleted, long failed, long missing) {
assertNotNull(response);
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.getShardFailures().length, equalTo(0));
assertThat(response.getTotalFound(), equalTo(found));
assertThat(response.getTotalDeleted(), equalTo(deleted));
assertThat(response.getTotalFailed(), equalTo(failed));
assertThat(response.getTotalMissing(), equalTo(missing));
}
private void assertSearchContextsClosed() {
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for (NodeStats nodeStat : nodesStats.getNodes()){
assertThat(nodeStat.getIndices().getSearch().getOpenContexts(), equalTo(0L));
}
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugin.deletebyquery.test.rest;
import com.carrotsearch.randomizedtesting.annotations.Name;
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.rest.ElasticsearchRestTestCase;
import org.elasticsearch.test.rest.ElasticsearchRestTestCase.Rest;
import org.elasticsearch.test.rest.RestTestCandidate;
import org.elasticsearch.test.rest.parser.RestTestParseException;
import org.junit.Ignore;
import java.io.IOException;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope.SUITE;
@Rest
@ClusterScope(scope = SUITE, randomDynamicTemplates = false)
@Ignore
@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/11721")
public class DeleteByQueryRestTests extends ElasticsearchRestTestCase {
public DeleteByQueryRestTests(@Name("yaml") RestTestCandidate testCandidate) {
super(testCandidate);
}
@ParametersFactory
public static Iterable<Object[]> parameters() throws IOException, RestTestParseException {
return ElasticsearchRestTestCase.createParameters(0, 1);
}
}

View File

@ -55,6 +55,12 @@
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- Required by the REST test framework -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<!-- typical layout for plugins --> <!-- typical layout for plugins -->
@ -92,6 +98,30 @@
<include>**/*.properties</include> <include>**/*.properties</include>
</includes> </includes>
</testResource> </testResource>
<!-- REST API specification and test suites -->
<testResource>
<directory>${project.basedir}/rest-api-spec</directory>
<targetPath>rest-api-spec</targetPath>
<includes>
<include>api/*.json</include>
<include>test/**/*.yaml</include>
</includes>
</testResource>
<!-- REST API specifications copied from main Elasticsearch specs
because they are required to execute the REST tests in plugins -->
<testResource>
<directory>${project.basedir}/../../rest-api-spec</directory>
<targetPath>rest-api-spec</targetPath>
<includes>
<!-- required by the test framework -->
<include>api/info.json</include>
<include>api/cluster.health.json</include>
<!-- used in plugin REST tests -->
<include>api/index.json</include>
<include>api/indices.refresh.json</include>
<include>api/count.json</include>
</includes>
</testResource>
<!-- shared test resources like log4j.properties --> <!-- shared test resources like log4j.properties -->
<testResource> <testResource>
<directory>${elasticsearch.tools.directory}/shared-test-resources</directory> <directory>${elasticsearch.tools.directory}/shared-test-resources</directory>
@ -135,5 +165,6 @@
<module>cloud-aws</module> <module>cloud-aws</module>
<module>lang-python</module> <module>lang-python</module>
<module>lang-javascript</module> <module>lang-javascript</module>
<module>delete-by-query</module>
</modules> </modules>
</project> </project>