Return missing (404) is a scroll_id is cleared that no longer exists.

Closes 
This commit is contained in:
Martijn van Groningen 2014-04-18 14:23:13 +07:00
parent 51de01bae5
commit 145efbf6ea
7 changed files with 125 additions and 39 deletions
rest-api-spec/test/scroll
src
main/java/org/elasticsearch
test/java/org/elasticsearch/search/scroll

@ -32,3 +32,8 @@
catch: missing catch: missing
scroll: scroll:
scroll_id: $scroll_id1 scroll_id: $scroll_id1
- do:
catch: missing
clear_scroll:
scroll_id: $scroll_id1

@ -19,38 +19,80 @@
package org.elasticsearch.action.search; package org.elasticsearch.action.search;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.StatusToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException; import java.io.IOException;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
*/ */
public class ClearScrollResponse extends ActionResponse { public class ClearScrollResponse extends ActionResponse implements StatusToXContent {
private boolean succeeded; private boolean succeeded;
private int numFreed;
public ClearScrollResponse(boolean succeeded) { public ClearScrollResponse(boolean succeeded, int numFreed) {
this.succeeded = succeeded; this.succeeded = succeeded;
this.numFreed = numFreed;
} }
ClearScrollResponse() { ClearScrollResponse() {
} }
/**
* @return Whether the attempt to clear a scroll was successful.
*/
public boolean isSucceeded() { public boolean isSucceeded() {
return succeeded; return succeeded;
} }
/**
* @return The number of seach contexts that were freed. If this is <code>0</code> the assumption can be made,
* that the scroll id specified in the request did not exist. (never existed, was expired, or completely consumed)
*/
public int getNumFreed() {
return numFreed;
}
@Override
public RestStatus status() {
return numFreed == 0 ? NOT_FOUND : OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.endObject();
return builder;
}
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
succeeded = in.readBoolean(); succeeded = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
numFreed = in.readVInt();
} else {
// On older nodes we can't tell how many search contexts where freed, so we assume at least one,
// so that the rest api doesn't return 404 where SC were indeed freed.
numFreed = 1;
}
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
out.writeBoolean(succeeded); out.writeBoolean(succeeded);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeVInt(numFreed);
}
} }
} }

@ -37,6 +37,7 @@ import org.elasticsearch.transport.TransportService;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId; import static org.elasticsearch.action.search.type.TransportSearchHelper.parseScrollId;
@ -67,8 +68,9 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
final CountDown expectedOps; final CountDown expectedOps;
final ClearScrollRequest request; final ClearScrollRequest request;
final List<Tuple<String, Long>[]> contexts = new ArrayList<>(); final List<Tuple<String, Long>[]> contexts = new ArrayList<>();
final AtomicReference<Throwable> expHolder;
final ActionListener<ClearScrollResponse> listener; final ActionListener<ClearScrollResponse> listener;
final AtomicReference<Throwable> expHolder;
final AtomicInteger numberOfFreedSearchContexts = new AtomicInteger(0);
private Async(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener, ClusterState clusterState) { private Async(ClearScrollRequest request, ActionListener<ClearScrollResponse> listener, ClusterState clusterState) {
int expectedOps = 0; int expectedOps = 0;
@ -91,7 +93,7 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
public void run() { public void run() {
if (expectedOps.isCountedDown()) { if (expectedOps.isCountedDown()) {
listener.onResponse(new ClearScrollResponse(true)); listener.onResponse(new ClearScrollResponse(true, 0));
return; return;
} }
@ -99,8 +101,8 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
for (final DiscoveryNode node : nodes) { for (final DiscoveryNode node : nodes) {
searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<Boolean>() { searchServiceTransportAction.sendClearAllScrollContexts(node, request, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean success) { public void onResponse(Boolean freed) {
onFreedContext(); onFreedContext(freed);
} }
@Override @Override
@ -114,14 +116,14 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
for (Tuple<String, Long> target : context) { for (Tuple<String, Long> target : context) {
final DiscoveryNode node = nodes.get(target.v1()); final DiscoveryNode node = nodes.get(target.v1());
if (node == null) { if (node == null) {
onFreedContext(); onFreedContext(false);
continue; continue;
} }
searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener<Boolean>() { searchServiceTransportAction.sendFreeContext(node, target.v2(), request, new ActionListener<Boolean>() {
@Override @Override
public void onResponse(Boolean success) { public void onResponse(Boolean freed) {
onFreedContext(); onFreedContext(freed);
} }
@Override @Override
@ -134,17 +136,20 @@ public class TransportClearScrollAction extends TransportAction<ClearScrollReque
} }
} }
void onFreedContext() { void onFreedContext(boolean freed) {
if (freed) {
numberOfFreedSearchContexts.incrementAndGet();
}
if (expectedOps.countDown()) { if (expectedOps.countDown()) {
boolean succeeded = expHolder.get() == null; boolean succeeded = expHolder.get() == null;
listener.onResponse(new ClearScrollResponse(succeeded)); listener.onResponse(new ClearScrollResponse(succeeded, numberOfFreedSearchContexts.get()));
} }
} }
void onFailedFreedContext(Throwable e, DiscoveryNode node) { void onFailedFreedContext(Throwable e, DiscoveryNode node) {
logger.warn("Clear SC failed on node[{}]", e, node); logger.warn("Clear SC failed on node[{}]", e, node);
if (expectedOps.countDown()) { if (expectedOps.countDown()) {
listener.onResponse(new ClearScrollResponse(false)); listener.onResponse(new ClearScrollResponse(false, numberOfFreedSearchContexts.get()));
} else { } else {
expHolder.set(e); expHolder.set(e);
} }

@ -25,15 +25,16 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.*; import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestBuilderListener; import org.elasticsearch.rest.action.support.RestStatusToXContentListener;
import java.util.Arrays; import java.util.Arrays;
import static org.elasticsearch.rest.RestRequest.Method.DELETE; import static org.elasticsearch.rest.RestRequest.Method.DELETE;
import static org.elasticsearch.rest.RestStatus.OK;
/** /**
*/ */
@ -56,14 +57,7 @@ public class RestClearScrollAction extends BaseRestHandler {
ClearScrollRequest clearRequest = new ClearScrollRequest(); ClearScrollRequest clearRequest = new ClearScrollRequest();
clearRequest.setScrollIds(Arrays.asList(splitScrollIds(scrollIds))); clearRequest.setScrollIds(Arrays.asList(splitScrollIds(scrollIds)));
client.clearScroll(clearRequest, new RestBuilderListener<ClearScrollResponse>(channel) { client.clearScroll(clearRequest, new RestStatusToXContentListener<ClearScrollResponse>(channel));
@Override
public RestResponse buildResponse(ClearScrollResponse response, XContentBuilder builder) throws Exception {
builder.startObject();
builder.endObject();
return new BytesRestResponse(OK, builder);
}
});
} }
public static String[] splitScrollIds(String scrollIds) { public static String[] splitScrollIds(String scrollIds) {

@ -538,13 +538,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
return context; return context;
} }
public void freeContext(long id) { public boolean freeContext(long id) {
SearchContext context = activeContexts.remove(id); SearchContext context = activeContexts.remove(id);
if (context == null) { if (context == null) {
return; return false;
} }
context.indexShard().searchService().onFreeContext(context); context.indexShard().searchService().onFreeContext(context);
context.close(); context.close();
return true;
} }
private void freeContext(SearchContext context) { private void freeContext(SearchContext context) {

@ -19,6 +19,7 @@
package org.elasticsearch.search.action; package org.elasticsearch.search.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchRequest;
@ -106,18 +107,18 @@ public class SearchServiceTransportAction extends AbstractComponent {
public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) { public void sendFreeContext(DiscoveryNode node, long contextId, ClearScrollRequest request, final ActionListener<Boolean> actionListener) {
if (clusterService.state().nodes().localNodeId().equals(node.id())) { if (clusterService.state().nodes().localNodeId().equals(node.id())) {
searchService.freeContext(contextId); boolean freed = searchService.freeContext(contextId);
actionListener.onResponse(true); actionListener.onResponse(freed);
} else { } else {
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler<TransportResponse>() { transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new SearchFreeContextRequest(request, contextId), new TransportResponseHandler<SearchFreeContextResponse>() {
@Override @Override
public TransportResponse newInstance() { public SearchFreeContextResponse newInstance() {
return TransportResponse.Empty.INSTANCE; return new SearchFreeContextResponse();
} }
@Override @Override
public void handleResponse(TransportResponse response) { public void handleResponse(SearchFreeContextResponse response) {
actionListener.onResponse(true); actionListener.onResponse(response.isFreed());
} }
@Override @Override
@ -560,6 +561,40 @@ public class SearchServiceTransportAction extends AbstractComponent {
} }
} }
class SearchFreeContextResponse extends TransportResponse {
private boolean freed;
SearchFreeContextResponse() {
}
SearchFreeContextResponse(boolean freed) {
this.freed = freed;
}
public boolean isFreed() {
return freed;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
if (in.getVersion().onOrAfter(Version.V_1_2_0)) {
freed = in.readBoolean();
} else {
freed = true;
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_1_2_0)) {
out.writeBoolean(freed);
}
}
}
class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<SearchFreeContextRequest> { class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<SearchFreeContextRequest> {
static final String ACTION = "search/freeContext"; static final String ACTION = "search/freeContext";
@ -571,8 +606,8 @@ public class SearchServiceTransportAction extends AbstractComponent {
@Override @Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception { public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception {
searchService.freeContext(request.id()); boolean freed = searchService.freeContext(request.id());
channel.sendResponse(TransportResponse.Empty.INSTANCE); channel.sendResponse(new SearchFreeContextResponse(freed));
} }
@Override @Override

@ -19,7 +19,6 @@
package org.elasticsearch.search.scroll; package org.elasticsearch.search.scroll;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchRequestBuilder;
@ -31,7 +30,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -289,7 +287,9 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest {
.addScrollId(searchResponse1.getScrollId()) .addScrollId(searchResponse1.getScrollId())
.addScrollId(searchResponse2.getScrollId()) .addScrollId(searchResponse2.getScrollId())
.execute().actionGet(); .execute().actionGet();
assertThat(clearResponse.isSucceeded(), equalTo(true)); assertThat(clearResponse.isSucceeded(), is(true));
assertThat(clearResponse.getNumFreed(), greaterThan(0));
assertThat(clearResponse.status(), equalTo(RestStatus.OK));
assertThrows(client().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(client().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND);
assertThrows(client().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(client().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND);
@ -304,6 +304,8 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest {
// Whether we actually clear a scroll, we can't know, since that information isn't serialized in the // Whether we actually clear a scroll, we can't know, since that information isn't serialized in the
// free search context response, which is returned from each node we want to clear a particular scroll. // free search context response, which is returned from each node we want to clear a particular scroll.
assertThat(response.isSucceeded(), is(true)); assertThat(response.isSucceeded(), is(true));
assertThat(response.getNumFreed(), equalTo(0));
assertThat(response.status(), equalTo(RestStatus.NOT_FOUND));
} }
@Test @Test
@ -395,7 +397,9 @@ public class SearchScrollTests extends ElasticsearchIntegrationTest {
ClearScrollResponse clearResponse = client().prepareClearScroll().addScrollId("_all") ClearScrollResponse clearResponse = client().prepareClearScroll().addScrollId("_all")
.execute().actionGet(); .execute().actionGet();
assertThat(clearResponse.isSucceeded(), equalTo(true)); assertThat(clearResponse.isSucceeded(), is(true));
assertThat(clearResponse.getNumFreed(), greaterThan(0));
assertThat(clearResponse.status(), equalTo(RestStatus.OK));
assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse1.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND);
assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND); assertThrows(cluster().transportClient().prepareSearchScroll(searchResponse2.getScrollId()).setScroll(TimeValue.timeValueMinutes(2)), RestStatus.NOT_FOUND);