Fix Delete-by-Query with Shield

closes #14527
This commit is contained in:
Tanguy Leroux 2015-11-10 17:19:05 +01:00
parent 6e18702ddb
commit 4063354dbe
4 changed files with 45 additions and 18 deletions

View File

@ -64,6 +64,17 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
private long sizeInBytes = 0;
public BulkRequest() {
}
/**
* Creates a bulk request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public BulkRequest(ActionRequest request) {
super(request);
}
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/

View File

@ -37,6 +37,17 @@ public class ClearScrollRequest extends ActionRequest<ClearScrollRequest> {
private List<String> scrollIds;
public ClearScrollRequest() {
}
/**
* Creates a clear scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public ClearScrollRequest(ActionRequest request) {
super(request);
}
public List<String> getScrollIds() {
return scrollIds;
}

View File

@ -46,6 +46,14 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
this.scrollId = scrollId;
}
/**
* Creates a scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public SearchScrollRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

View File

@ -27,13 +27,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.Client;
@ -48,10 +42,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -109,8 +100,11 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
void executeScan() {
try {
final SearchRequest scanRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions(request.indicesOptions());
scanRequest.scroll(request.scroll());
final SearchRequest scanRequest = new SearchRequest(request)
.indices(request.indices())
.types(request.types())
.indicesOptions(request.indicesOptions())
.scroll(request.scroll());
if (request.routing() != null) {
scanRequest.routing(request.routing());
}
@ -119,7 +113,8 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
fields.add("_routing");
fields.add("_parent");
SearchSourceBuilder source = new SearchSourceBuilder()
.query(request.query()).fields(fields)
.query(request.query())
.fields(fields)
.sort("_doc") // important for performance
.fetchSource(false)
.version(true);
@ -155,7 +150,7 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
void executeScroll(final String scrollId) {
try {
logger.trace("executing scroll request [{}]", scrollId);
scrollAction.execute(new SearchScrollRequest(scrollId).scroll(request.scroll()), new ActionListener<SearchResponse>() {
scrollAction.execute(new SearchScrollRequest(request).scrollId(scrollId).scroll(request.scroll()), new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse scrollResponse) {
deleteHits(scrollId, scrollResponse);
@ -197,9 +192,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
}
// Delete the scrolled documents using the Bulk API
BulkRequest bulkRequest = new BulkRequest();
BulkRequest bulkRequest = new BulkRequest(request);
for (SearchHit doc : docs) {
DeleteRequest delete = new DeleteRequest(doc.index(), doc.type(), doc.id()).version(doc.version());
DeleteRequest delete = new DeleteRequest(request).index(doc.index()).type(doc.type()).id(doc.id()).version(doc.version());
SearchHitField routing = doc.field("_routing");
if (routing != null) {
delete.routing((String) routing.value());
@ -283,7 +278,9 @@ public class TransportDeleteByQueryAction extends HandledTransportAction<DeleteB
}
if (Strings.hasText(scrollId)) {
client.prepareClearScroll().addScrollId(scrollId).execute(new ActionListener<ClearScrollResponse>() {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest(request);
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse clearScrollResponse) {
logger.trace("scroll id [{}] cleared", scrollId);