From 145efbf6ea6f342c775ee98980c6dca7f5fdc016 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 18 Apr 2014 14:23:13 +0700 Subject: [PATCH] Return missing (404) is a scroll_id is cleared that no longer exists. Closes #5730 --- rest-api-spec/test/scroll/11_clear.yaml | 5 ++ .../action/search/ClearScrollResponse.java | 46 +++++++++++++++- .../search/TransportClearScrollAction.java | 25 +++++---- .../action/search/RestClearScrollAction.java | 18 +++---- .../elasticsearch/search/SearchService.java | 5 +- .../action/SearchServiceTransportAction.java | 53 +++++++++++++++---- .../search/scroll/SearchScrollTests.java | 12 +++-- 7 files changed, 125 insertions(+), 39 deletions(-) diff --git a/rest-api-spec/test/scroll/11_clear.yaml b/rest-api-spec/test/scroll/11_clear.yaml index 6cdfb288198..c86746da44d 100644 --- a/rest-api-spec/test/scroll/11_clear.yaml +++ b/rest-api-spec/test/scroll/11_clear.yaml @@ -32,3 +32,8 @@ catch: missing scroll: scroll_id: $scroll_id1 + + - do: + catch: missing + clear_scroll: + scroll_id: $scroll_id1 diff --git a/src/main/java/org/elasticsearch/action/search/ClearScrollResponse.java b/src/main/java/org/elasticsearch/action/search/ClearScrollResponse.java index f3ec45087a5..59479d845cd 100644 --- a/src/main/java/org/elasticsearch/action/search/ClearScrollResponse.java +++ b/src/main/java/org/elasticsearch/action/search/ClearScrollResponse.java @@ -19,38 +19,80 @@ package org.elasticsearch.action.search; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.StatusToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; +import static org.elasticsearch.rest.RestStatus.NOT_FOUND; +import static org.elasticsearch.rest.RestStatus.OK; + /** */ -public class ClearScrollResponse extends ActionResponse { +public class ClearScrollResponse extends ActionResponse implements StatusToXContent { private boolean succeeded; + private int numFreed; - public ClearScrollResponse(boolean succeeded) { + public ClearScrollResponse(boolean succeeded, int numFreed) { this.succeeded = succeeded; + this.numFreed = numFreed; } ClearScrollResponse() { } + /** + * @return Whether the attempt to clear a scroll was successful. + */ public boolean isSucceeded() { return succeeded; } + /** + * @return The number of seach contexts that were freed. If this is 0 the assumption can be made, + * that the scroll id specified in the request did not exist. (never existed, was expired, or completely consumed) + */ + public int getNumFreed() { + return numFreed; + } + + @Override + public RestStatus status() { + return numFreed == 0 ? NOT_FOUND : OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.endObject(); + return builder; + } + @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); succeeded = in.readBoolean(); + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + numFreed = in.readVInt(); + } else { + // On older nodes we can't tell how many search contexts where freed, so we assume at least one, + // so that the rest api doesn't return 404 where SC were indeed freed. + numFreed = 1; + } } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); out.writeBoolean(succeeded); + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeVInt(numFreed); + } } } diff --git a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java index 33afcfb960d..e63822fa2a9 100644 --- a/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java +++ b/src/main/java/org/elasticsearch/action/search/TransportClearScrollAction.java @@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportService; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; @@ -67,8 +68,9 @@ public class TransportClearScrollAction extends TransportAction[]> contexts = new ArrayList<>(); - final AtomicReference expHolder; final ActionListener listener; + final AtomicReference expHolder; + final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0); private Async(ClearScrollRequest request, ActionListener listener, ClusterState clusterState) { int expectedOps = 0; @@ -91,7 +93,7 @@ public class TransportClearScrollAction extends TransportAction() { @Override - public void onResponse(Boolean success) { - onFreedContext(); + public void onResponse(Boolean freed) { + onFreedContext(freed); } @Override @@ -114,14 +116,14 @@ public class TransportClearScrollAction extends TransportAction target : context) { final DiscoveryNode node = nodes.get(target.v1()); if (node == null) { - onFreedContext(); + onFreedContext(false); continue; } searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener() { @Override - public void onResponse(Boolean success) { - onFreedContext(); + public void onResponse(Boolean freed) { + onFreedContext(freed); } @Override @@ -134,17 +136,20 @@ public class TransportClearScrollAction extends TransportAction(channel) { - @Override - public RestResponse buildResponse(ClearScrollResponse response, XContentBuilder builder) throws Exception { - builder.startObject(); - builder.endObject(); - return new BytesRestResponse(OK, builder); - } - }); + client.clearScroll(clearRequest, new RestStatusToXContentListener(channel)); } public static String[] splitScrollIds(String scrollIds) { diff --git a/src/main/java/org/elasticsearch/search/SearchService.java b/src/main/java/org/elasticsearch/search/SearchService.java index 6b3ecc3f2de..7b17827b2a6 100644 --- a/src/main/java/org/elasticsearch/search/SearchService.java +++ b/src/main/java/org/elasticsearch/search/SearchService.java @@ -538,13 +538,14 @@ public class SearchService extends AbstractLifecycleComponent { return context; } - public void freeContext(long id) { + public boolean freeContext(long id) { SearchContext context = activeContexts.remove(id); if (context == null) { - return; + return false; } context.indexShard().searchService().onFreeContext(context); context.close(); + return true; } private void freeContext(SearchContext context) { diff --git a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java index 19fdd8ffd11..a66ecb3d0fb 100644 --- a/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java +++ b/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.search.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.SearchRequest; @@ -106,18 +107,18 @@ public class SearchServiceTransportAction extends AbstractComponent { public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener actionListener) { if (clusterService.state().nodes().localNodeId().equals(node.id())) { - searchService.freeContext(contextId); - actionListener.onResponse(true); + boolean freed = searchService.freeContext(contextId); + actionListener.onResponse(freed); } else { - transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler() { + transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler() { @Override - public TransportResponse newInstance() { - return TransportResponse.Empty.INSTANCE; + public SearchFreeContextResponse newInstance() { + return new SearchFreeContextResponse(); } @Override - public void handleResponse(TransportResponse response) { - actionListener.onResponse(true); + public void handleResponse(SearchFreeContextResponse response) { + actionListener.onResponse(response.isFreed()); } @Override @@ -560,6 +561,40 @@ public class SearchServiceTransportAction extends AbstractComponent { } } + class SearchFreeContextResponse extends TransportResponse { + + private boolean freed; + + SearchFreeContextResponse() { + } + + SearchFreeContextResponse(boolean freed) { + this.freed = freed; + } + + public boolean isFreed() { + return freed; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + if (in.getVersion().onOrAfter(Version.V_1_2_0)) { + freed = in.readBoolean(); + } else { + freed = true; + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_1_2_0)) { + out.writeBoolean(freed); + } + } + } + class SearchFreeContextTransportHandler extends BaseTransportRequestHandler { static final String ACTION = "search/freeContext"; @@ -571,8 +606,8 @@ public class SearchServiceTransportAction extends AbstractComponent { @Override public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception { - searchService.freeContext(request.id()); - channel.sendResponse(TransportResponse.Empty.INSTANCE); + boolean freed = searchService.freeContext(request.id()); + channel.sendResponse(new SearchFreeContextResponse(freed)); } @Override diff --git a/src/test/java/org/elasticsearch/search/scroll/SearchScrollTests.java b/src/test/java/org/elasticsearch/search/scroll/SearchScrollTests.java index 753251e6fd9..67f9bb38286 100644 --- a/src/test/java/org/elasticsearch/search/scroll/SearchScrollTests.java +++ b/src/test/java/org/elasticsearch/search/scroll/SearchScrollTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.search.scroll; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequestBuilder; @@ -31,7 +30,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.test.ElasticsearchIntegrationTest; @@ -289,7 +287,9 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest { .addScrollId(searchResponse1.getScrollId()) .addScrollId(searchResponse2.getScrollId()) .execute().actionGet(); - assertThat(clearResponse.isSucceeded(), equalTo(true)); + assertThat(clearResponse.isSucceeded(), is(true)); + assertThat(clearResponse.getNumFreed(), greaterThan(0)); + assertThat(clearResponse.status(), equalTo(RestStatus.OK)); assertThrows(client().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(client().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); @@ -304,6 +304,8 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest { // Whether we actually clear a scroll, we can't know, since that information isn't serialized in the // free search context response, which is returned from each node we want to clear a particular scroll. assertThat(response.isSucceeded(), is(true)); + assertThat(response.getNumFreed(), equalTo(0)); + assertThat(response.status(), equalTo(RestStatus.NOT_FOUND)); } @Test @@ -395,7 +397,9 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest { ClearScrollResponse clearResponse = client().prepareClearScroll().addScrollId("_all") .execute().actionGet(); - assertThat(clearResponse.isSucceeded(), equalTo(true)); + assertThat(clearResponse.isSucceeded(), is(true)); + assertThat(clearResponse.getNumFreed(), greaterThan(0)); + assertThat(clearResponse.status(), equalTo(RestStatus.OK)); assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND);