Added clear scroll api.

The clear scroll api allows clear all resources associated with a `scroll_id` by deleting the `scroll_id` and its associated SearchContext.

Closes #3657
This commit is contained in:
Martijn van Groningen 2013-09-10 21:16:47 +02:00
parent fafc4eef98
commit 0efa78710b
14 changed files with 804 additions and 2 deletions

View File

@ -127,3 +127,20 @@ returned. The total_hits will be maintained between scroll requests.
Note, scan search type does not support sorting (either on score or a Note, scan search type does not support sorting (either on score or a
field) or faceting. field) or faceting.
=== Clear scroll api
added[0.90.4]
Besides consuming the scroll search until no hits has been returned a scroll
search can also be aborted by deleting the `scroll_id`. This can be done via
the clear scroll api. When the the `scroll_id` has been deleted also all the
resources to keep the view open will cleaned open. Example usage:
[source,js]
--------------------------------------------------
curl -XDELETE 'localhost:9200/_search/scroll/c2NhbjsxOjBLMzdpWEtqU2IyZHlmVURPeFJOZnc7MzowSzM3aVhLalNiMmR5ZlVET3hSTmZ3OzU6MEszN2lYS2pTYjJkeWZVRE94Uk5mdzsyOjBLMzdpWEtqU2IyZHlmVURPeFJOZnc7NDowSzM3aVhLalNiMmR5ZlVET3hSTmZ3Ow=='
--------------------------------------------------
Multiple scroll ids can be specified in a comma separated manner, if no id is
specified then all scroll ids will be cleared up.

View File

@ -253,6 +253,7 @@ public class ActionModule extends AbstractModule {
registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class); registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class); registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
registerAction(ExplainAction.INSTANCE, TransportExplainAction.class); registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
// register Name -> GenericAction Map that can be injected to instances. // register Name -> GenericAction Map that can be injected to instances.
MapBinder<String, GenericAction> actionsBinder MapBinder<String, GenericAction> actionsBinder

View File

@ -0,0 +1,45 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.Client;
/**
*/
public class ClearScrollAction extends Action<ClearScrollRequest, ClearScrollResponse, ClearScrollRequestBuilder> {
public static final ClearScrollAction INSTANCE = new ClearScrollAction();
public static final String NAME = "clear_sc";
private ClearScrollAction() {
super(NAME);
}
@Override
public ClearScrollResponse newResponse() {
return new ClearScrollResponse();
}
@Override
public ClearScrollRequestBuilder newRequestBuilder(Client client) {
return new ClearScrollRequestBuilder(client);
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
/**
*/
public class ClearScrollRequest extends ActionRequest {
private List<String> scrollIds;
public List<String> getScrollIds() {
return scrollIds;
}
public void setScrollIds(List<String> scrollIds) {
this.scrollIds = scrollIds;
}
public void addScrollId(String scrollId) {
if (scrollIds == null) {
scrollIds = newArrayList();
}
scrollIds.add(scrollId);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
scrollIds = Arrays.asList(in.readStringArray());
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (scrollIds == null) {
out.writeVInt(0);
} else {
out.writeStringArray(scrollIds.toArray(new String[scrollIds.size()]));
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.internal.InternalClient;
import java.util.List;
/**
*/
public class ClearScrollRequestBuilder extends ActionRequestBuilder<ClearScrollRequest, ClearScrollResponse, ClearScrollRequestBuilder> {
public ClearScrollRequestBuilder(Client client) {
super((InternalClient) client, new ClearScrollRequest());
}
public ClearScrollRequestBuilder setScrollIds(List<String> cursorIds) {
request.setScrollIds(cursorIds);
return this;
}
public ClearScrollRequestBuilder addScrollId(String cursorId) {
request.addScrollId(cursorId);
return this;
}
@Override
protected void doExecute(ActionListener<ClearScrollResponse> listener) {
((Client) client).clearScroll(request, listener);
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*/
public class ClearScrollResponse extends ActionResponse {
private boolean succeeded;
public ClearScrollResponse(boolean succeeded) {
this.succeeded = succeeded;
}
ClearScrollResponse() {
}
public boolean isSucceeded() {
return succeeded;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
succeeded = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succeeded);
}
}

View File

@ -0,0 +1,153 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.action.SearchServiceTransportAction;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId;
/**
*/
public class TransportClearScrollAction extends TransportAction<ClearScrollRequest, ClearScrollResponse> {
private final ClusterService clusterService;
private final SearchServiceTransportAction searchServiceTransportAction;
@Inject
public TransportClearScrollAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, SearchServiceTransportAction searchServiceTransportAction) {
super(settings, threadPool);
this.clusterService = clusterService;
this.searchServiceTransportAction = searchServiceTransportAction;
}
@Override
protected void doExecute(ClearScrollRequest request, final ActionListener<ClearScrollResponse> listener) {
new Async(request, listener, clusterService.state()).run();
}
private class Async {
final DiscoveryNodes nodes;
final AtomicInteger expectedOps;
final ClearScrollRequest request;
final List<Tuple<String, Long>[]> contexts = new ArrayList<Tuple<String, Long>[]>();
final AtomicReference<Throwable> expHolder;
final ActionListener<ClearScrollResponse> listener;
private Async(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener, ClusterState clusterState) {
int expectedOps = 0;
this.nodes = clusterState.nodes();
if (request.getScrollIds() == null || request.getScrollIds().isEmpty()) {
expectedOps = nodes.size();
} else {
for (String parsedScrollId : request.getScrollIds()) {
Tuple<String, Long>[] context = parseScrollId(parsedScrollId).getContext();
expectedOps += context.length;
this.contexts.add(context);
}
}
this.request = request;
this.listener = listener;
this.expHolder = new AtomicReference<Throwable>();
this.expectedOps = new AtomicInteger(expectedOps);
}
public void run() {
if (expectedOps.get() == 0) {
listener.onResponse(new ClearScrollResponse(true));
return;
}
if (contexts.isEmpty()) {
for (final DiscoveryNode node : nodes) {
searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
onFreedContext();
}
@Override
public void onFailure(Throwable e) {
onFailedFreedContext(e, node);
}
});
}
} else {
for (Tuple<String, Long>[] context : contexts) {
for (Tuple<String, Long> target : context) {
final DiscoveryNode node = nodes.get(target.v1());
if (node == null) {
onFreedContext();
continue;
}
searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean success) {
onFreedContext();
}
@Override
public void onFailure(Throwable e) {
onFailedFreedContext(e, node);
}
});
}
}
}
}
void onFreedContext() {
assert expectedOps.get() > 0;
if (expectedOps.decrementAndGet() == 0) {
boolean succeeded = expHolder.get() == null;
listener.onResponse(new ClearScrollResponse(succeeded));
}
}
void onFailedFreedContext(Throwable e, DiscoveryNode node) {
logger.warn("Clear SC failed on node[{}]", e, node);
assert expectedOps.get() > 0;
if (expectedOps.decrementAndGet() == 0) {
listener.onResponse(new ClearScrollResponse(false));
} else {
expHolder.set(e);
}
}
}
}

View File

@ -540,4 +540,19 @@ public interface Client {
*/ */
void explain(ExplainRequest request, ActionListener<ExplainResponse> listener); void explain(ExplainRequest request, ActionListener<ExplainResponse> listener);
/**
* Clears the search contexts associated with specified scroll ids.
*/
ClearScrollRequestBuilder prepareClearScroll();
/**
* Clears the search contexts associated with specified scroll ids.
*/
ActionFuture<ClearScrollResponse> clearScroll(ClearScrollRequest request);
/**
* Clears the search contexts associated with specified scroll ids.
*/
void clearScroll(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener);
} }

View File

@ -366,4 +366,19 @@ public abstract class AbstractClient implements InternalClient {
public void explain(ExplainRequest request, ActionListener<ExplainResponse> listener) { public void explain(ExplainRequest request, ActionListener<ExplainResponse> listener) {
execute(ExplainAction.INSTANCE, request, listener); execute(ExplainAction.INSTANCE, request, listener);
} }
@Override
public void clearScroll(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener) {
execute(ClearScrollAction.INSTANCE, request, listener);
}
@Override
public ActionFuture<ClearScrollResponse> clearScroll(ClearScrollRequest request) {
return execute(ClearScrollAction.INSTANCE, request);
}
@Override
public ClearScrollRequestBuilder prepareClearScroll() {
return new ClearScrollRequestBuilder(this);
}
} }

View File

@ -86,6 +86,7 @@ import org.elasticsearch.rest.action.main.RestMainAction;
import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction; import org.elasticsearch.rest.action.mlt.RestMoreLikeThisAction;
import org.elasticsearch.rest.action.percolate.RestMultiPercolateAction; import org.elasticsearch.rest.action.percolate.RestMultiPercolateAction;
import org.elasticsearch.rest.action.percolate.RestPercolateAction; import org.elasticsearch.rest.action.percolate.RestPercolateAction;
import org.elasticsearch.rest.action.search.RestClearScrollAction;
import org.elasticsearch.rest.action.search.RestMultiSearchAction; import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction; import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction; import org.elasticsearch.rest.action.search.RestSearchScrollAction;
@ -199,5 +200,6 @@ public class RestActionModule extends AbstractModule {
bind(RestIndicesAction.class).asEagerSingleton(); bind(RestIndicesAction.class).asEagerSingleton();
// Fully qualified to prevent interference with rest.action.count.RestCountAction // Fully qualified to prevent interference with rest.action.count.RestCountAction
bind(org.elasticsearch.rest.action.cat.RestCountAction.class).asEagerSingleton(); bind(org.elasticsearch.rest.action.cat.RestCountAction.class).asEagerSingleton();
bind(RestClearScrollAction.class).asEagerSingleton();;
} }
} }

View File

@ -0,0 +1,95 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.*;
import java.io.IOException;
import java.util.Arrays;
import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.rest.action.support.RestXContentBuilder.restContentBuilder;
/**
*/
public class RestClearScrollAction extends BaseRestHandler {
@Inject
public RestClearScrollAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(DELETE, "/_search/scroll", this);
controller.registerHandler(DELETE, "/_search/scroll/{scroll_id}", this);
}
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
String scrollIds = request.param("scroll_id");
ClearScrollRequest clearRequest = new ClearScrollRequest();
clearRequest.setScrollIds(Arrays.asList(splitScrollIds(scrollIds)));
client.clearScroll(clearRequest, new ActionListener<ClearScrollResponse>() {
@Override
public void onResponse(ClearScrollResponse response) {
try {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field(Fields.OK, response.isSucceeded());
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
public static String[] splitScrollIds(String scrollIds) {
if (scrollIds == null) {
return Strings.EMPTY_ARRAY;
}
return Strings.splitStringByCommaToArray(scrollIds);
}
static final class Fields {
static final XContentBuilderString OK = new XContentBuilderString("ok");
}
}

View File

@ -509,6 +509,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
context.release(); context.release();
} }
public void freeAllScrollContexts() {
for (SearchContext searchContext : activeContexts.values()) {
if (searchContext.scroll() != null) {
freeContext(searchContext);
}
}
}
private void contextProcessing(SearchContext context) { private void contextProcessing(SearchContext context) {
// disable timeout while executing a search // disable timeout while executing a search
context.accessed(-1); context.accessed(-1);

View File

@ -19,6 +19,8 @@
package org.elasticsearch.search.action; package org.elasticsearch.search.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -81,6 +83,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
this.searchService = searchService; this.searchService = searchService;
transportService.registerHandler(SearchFreeContextTransportHandler.ACTION, new SearchFreeContextTransportHandler()); transportService.registerHandler(SearchFreeContextTransportHandler.ACTION, new SearchFreeContextTransportHandler());
transportService.registerHandler(ClearScrollContextsTransportHandler.ACTION, new ClearScrollContextsTransportHandler());
transportService.registerHandler(SearchDfsTransportHandler.ACTION, new SearchDfsTransportHandler()); transportService.registerHandler(SearchDfsTransportHandler.ACTION, new SearchDfsTransportHandler());
transportService.registerHandler(SearchQueryTransportHandler.ACTION, new SearchQueryTransportHandler()); transportService.registerHandler(SearchQueryTransportHandler.ACTION, new SearchQueryTransportHandler());
transportService.registerHandler(SearchQueryByIdTransportHandler.ACTION, new SearchQueryByIdTransportHandler()); transportService.registerHandler(SearchQueryByIdTransportHandler.ACTION, new SearchQueryByIdTransportHandler());
@ -101,6 +104,64 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
searchService.freeContext(contextId);
actionListener.onResponse(true);
} else {
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse response) {
actionListener.onResponse(true);
}
@Override
public void handleException(TransportException exp) {
actionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
public void sendClearAllScrollContexts(DiscoveryNode node, ClearScrollRequest request, final ActionListener<Boolean> actionListener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
searchService.freeAllScrollContexts();
actionListener.onResponse(true);
} else {
transportService.sendRequest(node, ClearScrollContextsTransportHandler.ACTION, new ClearScrollContextsRequest(request), new TransportResponseHandler<TransportResponse>() {
@Override
public TransportResponse newInstance() {
return TransportResponse.Empty.INSTANCE;
}
@Override
public void handleResponse(TransportResponse response) {
actionListener.onResponse(true);
}
@Override
public void handleException(TransportException exp) {
actionListener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}
}
public void sendExecuteDfs(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<DfsSearchResult> listener) { public void sendExecuteDfs(DiscoveryNode node, final ShardSearchRequest request, final SearchServiceListener<DfsSearchResult> listener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { if (clusterService.state().nodes().localNodeId().equals(node.id())) {
try { try {
@ -448,7 +509,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
SearchFreeContextRequest() { SearchFreeContextRequest() {
} }
SearchFreeContextRequest(SearchRequest request, long id) { SearchFreeContextRequest(TransportRequest request, long id) {
super(request); super(request);
this.id = id; this.id = id;
} }
@ -493,6 +554,39 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class ClearScrollContextsRequest extends TransportRequest {
ClearScrollContextsRequest() {
}
ClearScrollContextsRequest(TransportRequest request) {
super(request);
}
}
class ClearScrollContextsTransportHandler extends BaseTransportRequestHandler<ClearScrollContextsRequest> {
static final String ACTION = "search/clearScrollContexts";
@Override
public ClearScrollContextsRequest newInstance() {
return new ClearScrollContextsRequest();
}
@Override
public void messageReceived(ClearScrollContextsRequest request, TransportChannel channel) throws Exception {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
@Override
public String executor() {
// freeing the context is cheap,
// no need for fork it to another thread
return ThreadPool.Names.SAME;
}
}
private class SearchDfsTransportHandler extends BaseTransportRequestHandler<ShardSearchRequest> { private class SearchDfsTransportHandler extends BaseTransportRequestHandler<ShardSearchRequest> {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.test.integration.search.scroll; package org.elasticsearch.test.integration.search.scroll;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -33,7 +34,6 @@ import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.*; import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
/** /**
@ -217,4 +217,179 @@ public class SearchScrollTests extends AbstractSharedClusterTest {
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l)); assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l)); assertThat(client().prepareCount().setQuery(termQuery("message", "update")).execute().actionGet().getCount(), equalTo(500l));
} }
@Test
public void testSimpleScrollQueryThenFetch_clearScrollIds() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
for (int i = 0; i < 100; i++) {
client().prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject().field("field", i).endObject()).execute().actionGet();
}
client().admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse1 = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(35)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
SearchResponse searchResponse2 = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(35)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
long counter1 = 0;
long counter2 = 0;
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse1.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse1.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter1++));
}
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse2.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse2.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter2++));
}
searchResponse1 = client().prepareSearchScroll(searchResponse1.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
searchResponse2 = client().prepareSearchScroll(searchResponse2.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse1.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse1.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter1++));
}
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse2.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse2.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter2++));
}
ClearScrollResponse clearResponse = client().prepareClearScroll()
.addScrollId(searchResponse1.getScrollId())
.addScrollId(searchResponse2.getScrollId())
.execute().actionGet();
assertThat(clearResponse.isSucceeded(), equalTo(true));
searchResponse1 = client().prepareSearchScroll(searchResponse1.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
searchResponse2 = client().prepareSearchScroll(searchResponse2.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(0l));
assertThat(searchResponse1.getHits().hits().length, equalTo(0));
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(0l));
assertThat(searchResponse2.getHits().hits().length, equalTo(0));
}
@Test
public void testSimpleScrollQueryThenFetch_clearAllScrollIds() throws Exception {
try {
client().admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client().admin().indices().prepareCreate("test").setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 3)).execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
for (int i = 0; i < 100; i++) {
client().prepareIndex("test", "type1", Integer.toString(i)).setSource(jsonBuilder().startObject().field("field", i).endObject()).execute().actionGet();
}
client().admin().indices().prepareRefresh().execute().actionGet();
SearchResponse searchResponse1 = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(35)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
SearchResponse searchResponse2 = client().prepareSearch()
.setQuery(matchAllQuery())
.setSize(35)
.setScroll(TimeValue.timeValueMinutes(2))
.addSort("field", SortOrder.ASC)
.execute().actionGet();
long counter1 = 0;
long counter2 = 0;
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse1.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse1.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter1++));
}
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse2.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse2.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter2++));
}
searchResponse1 = client().prepareSearchScroll(searchResponse1.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
searchResponse2 = client().prepareSearchScroll(searchResponse2.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse1.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse1.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter1++));
}
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(100l));
assertThat(searchResponse2.getHits().hits().length, equalTo(35));
for (SearchHit hit : searchResponse2.getHits()) {
assertThat(((Number) hit.sortValues()[0]).longValue(), equalTo(counter2++));
}
ClearScrollResponse clearResponse = client().prepareClearScroll()
.execute().actionGet();
assertThat(clearResponse.isSucceeded(), equalTo(true));
searchResponse1 = client().prepareSearchScroll(searchResponse1.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
searchResponse2 = client().prepareSearchScroll(searchResponse2.getScrollId())
.setScroll(TimeValue.timeValueMinutes(2))
.execute().actionGet();
assertThat(searchResponse1.getHits().getTotalHits(), equalTo(0l));
assertThat(searchResponse1.getHits().hits().length, equalTo(0));
assertThat(searchResponse2.getHits().getTotalHits(), equalTo(0l));
assertThat(searchResponse2.getHits().hits().length, equalTo(0));
}
} }