Internal: split internal free context request used after scroll and search
We currently use the same internal request when we need to free the search context after a search and a scroll. The two original requests though diverged after #6933 as `SearchRequest` implements `IndicesRequest` while `SearchScrollRequest` and `ClearScrollRequest` don't. That said, with #7319 we made `SearchFreeContextRequest` implement `IndicesRequest` by making it hold the original indices taken from the original request, which are null if the free context was originated by a scroll or by a clear scroll call, and that is why original indices are optional there. This commit introduces a separate free context request and transport action for scroll, which doesn't hold original indices. The new action is only used against nodes that expose it, the previous action name will be used for nodes older than 1.4.0.Beta1. As a result, in 1.4 we have a new `indices:data/read/search[free_context/scroll]` action that is equivalent to the previous `indices:data/read/search[free_context]` whose request implements now `IndicesRequest` and holds the original indices coming from the original request. The original indices in the latter requests can only be null during a rolling upgrade (already existing version checks make sure that serialization is bw compatible), when some nodes are still < 1.4. Closes #7856
This commit is contained in:
parent
ea49a3e269
commit
17b1fd1a6a
|
@ -57,6 +57,7 @@ import java.util.concurrent.Callable;
|
|||
*/
|
||||
public class SearchServiceTransportAction extends AbstractComponent {
|
||||
|
||||
public static final String FREE_CONTEXT_SCROLL_ACTION_NAME = "indices:data/read/search[free_context/scroll]";
|
||||
public static final String FREE_CONTEXT_ACTION_NAME = "indices:data/read/search[free_context]";
|
||||
public static final String CLEAR_SCROLL_CONTEXTS_ACTION_NAME = "indices:data/read/search[clear_scroll_contexts]";
|
||||
public static final String DFS_ACTION_NAME = "indices:data/read/search[phase/dfs]";
|
||||
|
@ -121,6 +122,7 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
this.clusterService = clusterService;
|
||||
this.searchService = searchService;
|
||||
|
||||
transportService.registerHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextTransportHandler());
|
||||
transportService.registerHandler(FREE_CONTEXT_ACTION_NAME, new SearchFreeContextTransportHandler());
|
||||
transportService.registerHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, new ClearScrollContextsTransportHandler());
|
||||
transportService.registerHandler(DFS_ACTION_NAME, new SearchDfsTransportHandler());
|
||||
|
@ -148,7 +150,14 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
final boolean freed = searchService.freeContext(contextId);
|
||||
actionListener.onResponse(freed);
|
||||
} else {
|
||||
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new SearchFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
|
||||
if (node.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
|
||||
//use the separate action for scroll when possible
|
||||
transportService.sendRequest(node, FREE_CONTEXT_SCROLL_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
|
||||
} else {
|
||||
//fallback to the previous action name if the new one is not supported by the node we are talking to.
|
||||
//Do use the same request since it has the same binary format as the previous SearchFreeContextRequest (without the OriginalIndices addition).
|
||||
transportService.sendRequest(node, FREE_CONTEXT_ACTION_NAME, new ScrollFreeContextRequest(request, contextId), new FreeContextResponseHandler(actionListener));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -550,52 +559,75 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
static class SearchFreeContextRequest extends TransportRequest implements IndicesRequest {
|
||||
|
||||
static class ScrollFreeContextRequest extends TransportRequest {
|
||||
private long id;
|
||||
private OriginalIndices originalIndices;
|
||||
|
||||
SearchFreeContextRequest() {
|
||||
ScrollFreeContextRequest() {
|
||||
}
|
||||
|
||||
SearchFreeContextRequest(SearchRequest request, long id) {
|
||||
super(request);
|
||||
this.id = id;
|
||||
this.originalIndices = new OriginalIndices(request);
|
||||
ScrollFreeContextRequest(ClearScrollRequest request, long id) {
|
||||
this((TransportRequest) request, id);
|
||||
}
|
||||
|
||||
SearchFreeContextRequest(TransportRequest request, long id) {
|
||||
private ScrollFreeContextRequest(TransportRequest request, long id) {
|
||||
super(request);
|
||||
this.id = id;
|
||||
this.originalIndices = OriginalIndices.EMPTY;
|
||||
}
|
||||
|
||||
public long id() {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return originalIndices.indices();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return originalIndices.indicesOptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
id = in.readLong();
|
||||
originalIndices = OriginalIndices.readOptionalOriginalIndices(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeLong(id);
|
||||
OriginalIndices.writeOptionalOriginalIndices(originalIndices, out);
|
||||
}
|
||||
}
|
||||
|
||||
static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
|
||||
private OriginalIndices originalIndices;
|
||||
|
||||
SearchFreeContextRequest() {
|
||||
}
|
||||
|
||||
SearchFreeContextRequest(SearchRequest request, long id) {
|
||||
super(request, id);
|
||||
this.originalIndices = new OriginalIndices(request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] indices() {
|
||||
if (originalIndices == null) {
|
||||
return null;
|
||||
}
|
||||
return originalIndices.indices();
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
if (originalIndices == null) {
|
||||
return null;
|
||||
}
|
||||
return originalIndices.indicesOptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
originalIndices = OriginalIndices.readOriginalIndices(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
OriginalIndices.writeOriginalIndices(originalIndices, out);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -633,15 +665,12 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class SearchFreeContextTransportHandler extends BaseTransportRequestHandler<SearchFreeContextRequest> {
|
||||
private abstract class BaseFreeContextTransportHandler<FreeContextRequest extends ScrollFreeContextRequest> extends BaseTransportRequestHandler<FreeContextRequest> {
|
||||
@Override
|
||||
public abstract FreeContextRequest newInstance();
|
||||
|
||||
@Override
|
||||
public SearchFreeContextRequest newInstance() {
|
||||
return new SearchFreeContextRequest();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(FreeContextRequest request, TransportChannel channel) throws Exception {
|
||||
boolean freed = searchService.freeContext(request.id());
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
}
|
||||
|
@ -654,6 +683,20 @@ public class SearchServiceTransportAction extends AbstractComponent {
|
|||
}
|
||||
}
|
||||
|
||||
class ScrollFreeContextTransportHandler extends BaseFreeContextTransportHandler<ScrollFreeContextRequest> {
|
||||
@Override
|
||||
public ScrollFreeContextRequest newInstance() {
|
||||
return new ScrollFreeContextRequest();
|
||||
}
|
||||
}
|
||||
|
||||
class SearchFreeContextTransportHandler extends BaseFreeContextTransportHandler<SearchFreeContextRequest> {
|
||||
@Override
|
||||
public SearchFreeContextRequest newInstance() {
|
||||
return new SearchFreeContextRequest();
|
||||
}
|
||||
}
|
||||
|
||||
static class ClearScrollContextsRequest extends TransportRequest {
|
||||
|
||||
ClearScrollContextsRequest() {
|
||||
|
|
|
@ -36,7 +36,9 @@ import org.elasticsearch.action.explain.ExplainResponse;
|
|||
import org.elasticsearch.action.get.*;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchType;
|
||||
import org.elasticsearch.action.termvector.TermVectorResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequestBuilder;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
|
@ -704,6 +706,49 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
|
|||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testScroll() throws ExecutionException, InterruptedException {
|
||||
createIndex("test");
|
||||
ensureYellow("test");
|
||||
|
||||
int numDocs = iterations(10, 100);
|
||||
IndexRequestBuilder[] indexRequestBuilders = new IndexRequestBuilder[numDocs];
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
indexRequestBuilders[i] = client().prepareIndex("test", "type", Integer.toString(i)).setSource("field", "value" + Integer.toString(i));
|
||||
}
|
||||
indexRandom(true, indexRequestBuilders);
|
||||
|
||||
int size = randomIntBetween(1, 10);
|
||||
SearchRequestBuilder searchRequestBuilder = client().prepareSearch("test").setScroll("1m").setSize(size);
|
||||
boolean scan = randomBoolean();
|
||||
if (scan) {
|
||||
searchRequestBuilder.setSearchType(SearchType.SCAN);
|
||||
}
|
||||
|
||||
SearchResponse searchResponse = searchRequestBuilder.get();
|
||||
assertThat(searchResponse.getScrollId(), notNullValue());
|
||||
assertHitCount(searchResponse, numDocs);
|
||||
int hits = 0;
|
||||
if (scan) {
|
||||
assertThat(searchResponse.getHits().getHits().length, equalTo(0));
|
||||
} else {
|
||||
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
|
||||
hits += searchResponse.getHits().getHits().length;
|
||||
}
|
||||
|
||||
try {
|
||||
do {
|
||||
searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll("1m").get();
|
||||
assertThat(searchResponse.getScrollId(), notNullValue());
|
||||
assertHitCount(searchResponse, numDocs);
|
||||
hits += searchResponse.getHits().getHits().length;
|
||||
} while (searchResponse.getHits().getHits().length > 0);
|
||||
assertThat(hits, equalTo(numDocs));
|
||||
} finally {
|
||||
clearScroll(searchResponse.getScrollId());
|
||||
}
|
||||
}
|
||||
|
||||
private static String indexOrAlias() {
|
||||
return randomBoolean() ? "test" : "alias";
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.action.indexedscripts.put.PutIndexedScriptAction;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -135,5 +136,6 @@ public class ActionNamesBackwardsCompatibilityTest extends ElasticsearchBackward
|
|||
actionsVersions.put(DeleteIndexedScriptAction.NAME, Version.V_1_3_0);
|
||||
actionsVersions.put(PutIndexedScriptAction.NAME, Version.V_1_3_0);
|
||||
|
||||
actionsVersions.put(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME, Version.V_1_4_0_Beta1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.elasticsearch.action.bench.BenchmarkAction;
|
|||
import org.elasticsearch.action.bench.BenchmarkService;
|
||||
import org.elasticsearch.action.bench.BenchmarkStatusAction;
|
||||
import org.elasticsearch.action.exists.ExistsAction;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -139,5 +140,6 @@ public class ActionNamesTests extends ElasticsearchIntegrationTest {
|
|||
post_1_4_actions.add(ExistsAction.NAME);
|
||||
post_1_4_actions.add(ExistsAction.NAME + "[s]");
|
||||
post_1_4_actions.add(GetIndexAction.NAME);
|
||||
post_1_4_actions.add(SearchServiceTransportAction.FREE_CONTEXT_SCROLL_ACTION_NAME);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue