Add SearchRestCancellationIT

This test verifies automatic cancellation of search requests on connection close.
It was previously not present in 7.x as the http client was subject do a bug which
made testing cancellation of requests impossible. Now that the bug is fixed upstream,
we can also backport this test
This commit is contained in:
Luca Cavanna 2019-09-11 13:53:57 +02:00
parent 030d43a76a
commit de47ea2cf4
1 changed files with 251 additions and 0 deletions

View File

@ -0,0 +1,251 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.lookup.LeafFieldsLookup;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.elasticsearch.index.query.QueryBuilders.scriptQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
public class SearchRestCancellationIT extends HttpSmokeTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
List<Class<? extends Plugin>> plugins = new ArrayList<>();
plugins.add(ScriptedBlockPlugin.class);
plugins.addAll(super.nodePlugins());
return plugins;
}
public void testAutomaticCancellationDuringQueryPhase() throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
Request searchRequest = new Request("GET", "/test/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery(
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())));
searchRequest.setJsonEntity(Strings.toString(searchSource));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
Cancellable cancellable = getRestClient().performRequestAsync(searchRequest, new ResponseListener() {
@Override
public void onSuccess(Response response) {
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
error.set(exception);
latch.countDown();
}
});
awaitForBlock(plugins);
cancellable.cancel();
ensureSearchTaskIsCancelled(nodeIdToName::get);
disableBlocks(plugins);
latch.await();
assertThat(error.get(), instanceOf(CancellationException.class));
}
public void testAutomaticCancellationDuringFetchPhase() throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();
List<ScriptedBlockPlugin> plugins = initBlockFactory();
indexTestData();
Request searchRequest = new Request("GET", "/test/_search");
SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field",
new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()));
searchRequest.setJsonEntity(Strings.toString(searchSource));
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
Cancellable cancellable = getRestClient().performRequestAsync(searchRequest, new ResponseListener() {
@Override
public void onSuccess(Response response) {
latch.countDown();
}
@Override
public void onFailure(Exception exception) {
error.set(exception);
latch.countDown();
}
});
awaitForBlock(plugins);
cancellable.cancel();
ensureSearchTaskIsCancelled(nodeIdToName::get);
disableBlocks(plugins);
latch.await();
assertThat(error.get(), instanceOf(CancellationException.class));
}
private static Map<String, String> readNodesInfo() {
Map<String, String> nodeIdToName = new HashMap<>();
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
assertFalse(nodesInfoResponse.hasFailures());
for (NodeInfo node : nodesInfoResponse.getNodes()) {
nodeIdToName.put(node.getNode().getId(), node.getNode().getName());
}
return nodeIdToName;
}
private static void ensureSearchTaskIsCancelled(Function<String, String> nodeIdToName) throws Exception {
SetOnce<TaskInfo> searchTask = new SetOnce<>();
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
for (TaskInfo task : listTasksResponse.getTasks()) {
if (task.getAction().equals(SearchAction.NAME)) {
searchTask.set(task);
}
}
assertNotNull(searchTask.get());
TaskId taskId = searchTask.get().getTaskId();
String nodeName = nodeIdToName.apply(taskId.getNodeId());
assertBusy(() -> {
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager();
Task task = taskManager.getTask(taskId.getId());
assertThat(task, instanceOf(CancellableTask.class));
assertTrue(((CancellableTask)task).isCancelled());
});
}
private static void indexTestData() {
for (int i = 0; i < 5; i++) {
// Make sure we have a few segments
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int j = 0; j < 20; j++) {
bulkRequestBuilder.add(client().prepareIndex("test", "_doc", Integer.toString(i * 5 + j)).setSource("field", "value"));
}
assertNoFailures(bulkRequestBuilder.get());
}
}
private static List<ScriptedBlockPlugin> initBlockFactory() {
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
}
for (ScriptedBlockPlugin plugin : plugins) {
plugin.reset();
plugin.enableBlock();
}
return plugins;
}
private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception {
int numberOfShards = getNumShards("test").numPrimaries;
assertBusy(() -> {
int numberOfBlockedPlugins = 0;
for (ScriptedBlockPlugin plugin : plugins) {
numberOfBlockedPlugins += plugin.hits.get();
}
logger.info("The plugin blocked on {} out of {} shards", numberOfBlockedPlugins, numberOfShards);
assertThat(numberOfBlockedPlugins, greaterThan(0));
}, 10, TimeUnit.SECONDS);
}
private static void disableBlocks(List<ScriptedBlockPlugin> plugins) {
for (ScriptedBlockPlugin plugin : plugins) {
plugin.disableBlock();
}
}
public static class ScriptedBlockPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_block";
private final AtomicInteger hits = new AtomicInteger();
private final AtomicBoolean shouldBlock = new AtomicBoolean(true);
void reset() {
hits.set(0);
}
void disableBlock() {
shouldBlock.set(false);
}
void enableBlock() {
shouldBlock.set(true);
}
@Override
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.singletonMap(SCRIPT_NAME, params -> {
LeafFieldsLookup fieldsLookup = (LeafFieldsLookup) params.get("_fields");
LogManager.getLogger(SearchRestCancellationIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
awaitBusy(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
});
}
}
}