parent
23a0f8b617
commit
d47bbbafe0
|
@ -18,19 +18,25 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.http;
|
package org.elasticsearch.http;
|
||||||
|
|
||||||
|
import org.apache.http.entity.ContentType;
|
||||||
|
import org.apache.http.nio.entity.NByteArrayEntity;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||||
|
import org.elasticsearch.action.search.MultiSearchAction;
|
||||||
|
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchAction;
|
import org.elasticsearch.action.search.SearchAction;
|
||||||
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.support.WriteRequest;
|
import org.elasticsearch.action.support.WriteRequest;
|
||||||
import org.elasticsearch.client.Cancellable;
|
import org.elasticsearch.client.Cancellable;
|
||||||
import org.elasticsearch.client.Request;
|
import org.elasticsearch.client.Request;
|
||||||
import org.elasticsearch.client.Response;
|
import org.elasticsearch.client.Response;
|
||||||
import org.elasticsearch.client.ResponseListener;
|
import org.elasticsearch.client.ResponseListener;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.plugins.PluginsService;
|
import org.elasticsearch.plugins.PluginsService;
|
||||||
import org.elasticsearch.script.MockScriptPlugin;
|
import org.elasticsearch.script.MockScriptPlugin;
|
||||||
|
@ -45,6 +51,7 @@ import org.elasticsearch.tasks.TaskInfo;
|
||||||
import org.elasticsearch.tasks.TaskManager;
|
import org.elasticsearch.tasks.TaskManager;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -75,15 +82,29 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutomaticCancellationDuringQueryPhase() throws Exception {
|
public void testAutomaticCancellationDuringQueryPhase() throws Exception {
|
||||||
Map<String, String> nodeIdToName = readNodesInfo();
|
|
||||||
|
|
||||||
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
||||||
indexTestData();
|
|
||||||
|
|
||||||
Request searchRequest = new Request("GET", "/test/_search");
|
Request searchRequest = new Request("GET", "/test/_search");
|
||||||
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery(
|
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery(
|
||||||
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())));
|
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())));
|
||||||
searchRequest.setJsonEntity(Strings.toString(searchSource));
|
searchRequest.setJsonEntity(Strings.toString(searchSource));
|
||||||
|
verifyCancellationDuringQueryPhase(SearchAction.NAME, searchRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAutomaticCancellationMultiSearchDuringQueryPhase() throws Exception {
|
||||||
|
XContentType contentType = XContentType.JSON;
|
||||||
|
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
|
||||||
|
.source(new SearchSourceBuilder().scriptField("test_field",
|
||||||
|
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
|
||||||
|
Request restRequest = new Request("POST", "/_msearch");
|
||||||
|
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
|
||||||
|
restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
|
||||||
|
verifyCancellationDuringQueryPhase(MultiSearchAction.NAME, restRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
void verifyCancellationDuringQueryPhase(String searchAction, Request searchRequest) throws Exception {
|
||||||
|
Map<String, String> nodeIdToName = readNodesInfo();
|
||||||
|
|
||||||
|
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
||||||
|
indexTestData();
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
AtomicReference<Exception> error = new AtomicReference<>();
|
AtomicReference<Exception> error = new AtomicReference<>();
|
||||||
|
@ -102,7 +123,7 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
|
|
||||||
awaitForBlock(plugins);
|
awaitForBlock(plugins);
|
||||||
cancellable.cancel();
|
cancellable.cancel();
|
||||||
ensureSearchTaskIsCancelled(nodeIdToName::get);
|
ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);
|
||||||
|
|
||||||
disableBlocks(plugins);
|
disableBlocks(plugins);
|
||||||
latch.await();
|
latch.await();
|
||||||
|
@ -110,15 +131,29 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAutomaticCancellationDuringFetchPhase() throws Exception {
|
public void testAutomaticCancellationDuringFetchPhase() throws Exception {
|
||||||
Map<String, String> nodeIdToName = readNodesInfo();
|
|
||||||
|
|
||||||
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
|
||||||
indexTestData();
|
|
||||||
|
|
||||||
Request searchRequest = new Request("GET", "/test/_search");
|
Request searchRequest = new Request("GET", "/test/_search");
|
||||||
SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field",
|
SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field",
|
||||||
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()));
|
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()));
|
||||||
searchRequest.setJsonEntity(Strings.toString(searchSource));
|
searchRequest.setJsonEntity(Strings.toString(searchSource));
|
||||||
|
verifyCancellationDuringFetchPhase(SearchAction.NAME, searchRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAutomaticCancellationMultiSearchDuringFetchPhase() throws Exception {
|
||||||
|
XContentType contentType = XContentType.JSON;
|
||||||
|
MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
|
||||||
|
.source(new SearchSourceBuilder().scriptField("test_field",
|
||||||
|
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
|
||||||
|
Request restRequest = new Request("POST", "/_msearch");
|
||||||
|
byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
|
||||||
|
restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
|
||||||
|
verifyCancellationDuringFetchPhase(MultiSearchAction.NAME, restRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
void verifyCancellationDuringFetchPhase(String searchAction, Request searchRequest) throws Exception {
|
||||||
|
Map<String, String> nodeIdToName = readNodesInfo();
|
||||||
|
|
||||||
|
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
||||||
|
indexTestData();
|
||||||
|
|
||||||
CountDownLatch latch = new CountDownLatch(1);
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
AtomicReference<Exception> error = new AtomicReference<>();
|
AtomicReference<Exception> error = new AtomicReference<>();
|
||||||
|
@ -137,7 +172,7 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
|
|
||||||
awaitForBlock(plugins);
|
awaitForBlock(plugins);
|
||||||
cancellable.cancel();
|
cancellable.cancel();
|
||||||
ensureSearchTaskIsCancelled(nodeIdToName::get);
|
ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);
|
||||||
|
|
||||||
disableBlocks(plugins);
|
disableBlocks(plugins);
|
||||||
latch.await();
|
latch.await();
|
||||||
|
@ -154,11 +189,11 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
return nodeIdToName;
|
return nodeIdToName;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ensureSearchTaskIsCancelled(Function<String, String> nodeIdToName) throws Exception {
|
private static void ensureSearchTaskIsCancelled(String transportAction, Function<String, String> nodeIdToName) throws Exception {
|
||||||
SetOnce<TaskInfo> searchTask = new SetOnce<>();
|
SetOnce<TaskInfo> searchTask = new SetOnce<>();
|
||||||
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
|
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
|
||||||
for (TaskInfo task : listTasksResponse.getTasks()) {
|
for (TaskInfo task : listTasksResponse.getTasks()) {
|
||||||
if (task.getAction().equals(SearchAction.NAME)) {
|
if (task.getAction().equals(transportAction)) {
|
||||||
searchTask.set(task);
|
searchTask.set(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -248,4 +283,8 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static ContentType createContentType(final XContentType xContentType) {
|
||||||
|
return ContentType.create(xContentType.mediaTypeWithoutParameters(), (Charset) null);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.rest.action.search;
|
package org.elasticsearch.rest.action.search;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.elasticsearch.action.search.MultiSearchAction;
|
||||||
import org.elasticsearch.action.search.MultiSearchRequest;
|
import org.elasticsearch.action.search.MultiSearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
|
@ -35,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
import org.elasticsearch.rest.BaseRestHandler;
|
import org.elasticsearch.rest.BaseRestHandler;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
|
import org.elasticsearch.rest.action.RestCancellableNodeClient;
|
||||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
|
||||||
|
@ -99,7 +101,10 @@ public class RestMultiSearchAction extends BaseRestHandler {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return channel -> client.multiSearch(multiSearchRequest, new RestToXContentListener<>(channel));
|
return channel -> {
|
||||||
|
final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
|
||||||
|
cancellableClient.execute(MultiSearchAction.INSTANCE, multiSearchRequest, new RestToXContentListener<>(channel));
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue