Update reindex cancel tests
This commit is contained in:
parent
c5a9edf1c7
commit
b0b503035a
|
@ -259,10 +259,6 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
* Processes bulk responses, accounting for failures.
|
||||
*/
|
||||
void onBulkResponse(BulkResponse response) {
|
||||
if (task.isCancelled()) {
|
||||
finishHim(null);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
List<Failure> failures = new ArrayList<Failure>();
|
||||
Set<String> destinationIndicesThisBatch = new HashSet<>();
|
||||
|
@ -291,6 +287,12 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
// Track the indexes we've seen so we can refresh them if requested
|
||||
destinationIndicesThisBatch.add(item.getIndex());
|
||||
}
|
||||
|
||||
if (task.isCancelled()) {
|
||||
finishHim(null);
|
||||
return;
|
||||
}
|
||||
|
||||
addDestinationIndices(destinationIndicesThisBatch);
|
||||
|
||||
if (false == failures.isEmpty()) {
|
||||
|
@ -303,6 +305,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
|
|||
startNormalTermination(emptyList(), emptyList(), false);
|
||||
return;
|
||||
}
|
||||
|
||||
startNextScroll(response.getItems().length);
|
||||
} catch (Throwable t) {
|
||||
finishHim(t);
|
||||
|
|
|
@ -48,4 +48,9 @@ public class UpdateByQueryRequestBuilder extends
|
|||
request.setAbortOnVersionConflict(abortOnVersionConflict);
|
||||
return this;
|
||||
}
|
||||
|
||||
public UpdateByQueryRequestBuilder setPipeline(String pipeline) {
|
||||
request.setPipeline(pipeline);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -585,7 +585,8 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
|
|||
|
||||
public void testCancelBeforeOnBulkResponse() throws Exception {
|
||||
// We bail so early we don't need to pass in a half way valid response.
|
||||
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> action.onBulkResponse(null));
|
||||
cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) ->
|
||||
action.onBulkResponse(new BulkResponse(new BulkItemResponse[0], 0)));
|
||||
}
|
||||
|
||||
public void testCancelBeforeStartNextScroll() throws Exception {
|
||||
|
|
|
@ -1,166 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.NativeScriptFactory;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.script.ScriptService.ScriptType;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.test.ESIntegTestCase.client;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertThat;
|
||||
|
||||
/**
|
||||
* Utilities for testing reindex and update-by-query cancellation. This whole class isn't thread safe. Luckily we run our tests in separate
|
||||
* jvms.
|
||||
*/
|
||||
public class CancelTestUtils {
|
||||
public static Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return Arrays.asList(ReindexPlugin.class, StickyScriptPlugin.class);
|
||||
}
|
||||
|
||||
private static final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
public static <Request extends AbstractBulkIndexByScrollRequest<Request>,
|
||||
Builder extends AbstractBulkIndexByScrollRequestBuilder<Request, Builder>>
|
||||
BulkIndexByScrollResponse testCancel(ESIntegTestCase test, Builder request, String actionToCancel) throws Exception {
|
||||
|
||||
test.indexRandom(true, client().prepareIndex("source", "test", "1").setSource("foo", "a"),
|
||||
client().prepareIndex("source", "test", "2").setSource("foo", "a"));
|
||||
|
||||
request.source("source").script(new Script("sticky", ScriptType.INLINE, "native", emptyMap()));
|
||||
request.source().setSize(1);
|
||||
ListenableActionFuture<BulkIndexByScrollResponse> response = request.execute();
|
||||
|
||||
// Wait until the script is on the first document.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Let just one document through.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Wait until the script is on the second document.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Status should show running
|
||||
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
BulkByScrollTask.Status status = (Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertNull(status.getReasonCancelled());
|
||||
|
||||
// Cancel the request while the script is running. This will prevent the request from being sent at all.
|
||||
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(actionToCancel).get().getTasks();
|
||||
assertThat(cancelledTasks, hasSize(1));
|
||||
|
||||
// The status should now show canceled. The request will still be in the list because the script is still blocked.
|
||||
tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
status = (Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
|
||||
|
||||
// Now let the next document through. It won't be sent because the request is cancelled but we need to unblock the script.
|
||||
barrier.await();
|
||||
|
||||
// Now we can just wait on the request and make sure it was actually cancelled half way through.
|
||||
return response.get();
|
||||
}
|
||||
|
||||
public static class StickyScriptPlugin extends Plugin {
|
||||
@Override
|
||||
public String name() {
|
||||
return "sticky-script";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "installs a script that \"sticks\" when it runs for testing reindex";
|
||||
}
|
||||
|
||||
public void onModule(ScriptModule module) {
|
||||
module.registerScript("sticky", StickyScriptFactory.class);
|
||||
}
|
||||
}
|
||||
|
||||
public static class StickyScriptFactory implements NativeScriptFactory {
|
||||
@Override
|
||||
public ExecutableScript newScript(Map<String, Object> params) {
|
||||
return new ExecutableScript() {
|
||||
private Map<String, Object> source;
|
||||
@Override
|
||||
@SuppressWarnings("unchecked") // Safe because _ctx always has this shape
|
||||
public void setNextVar(String name, Object value) {
|
||||
if ("ctx".equals(name)) {
|
||||
Map<String, Object> ctx = (Map<String, Object>) value;
|
||||
source = (Map<String, Object>) ctx.get("_source");
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unexpected var: " + name);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object run() {
|
||||
try {
|
||||
// Tell the test we've started a document.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Wait for the test to tell us to proceed.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Make some change to the source so that update-by-query tests can make sure only one document was changed.
|
||||
source.put("giraffes", "giraffes");
|
||||
return null;
|
||||
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needsScores() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,240 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.action.ingest.DeletePipelineRequest;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.Engine.Operation.Origin;
|
||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.emptyIterable;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
/**
|
||||
* Test that you can actually cancel a reindex/update-by-query/delete-by-query request and all the plumbing works. Doesn't test all of the
|
||||
* different cancellation places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to
|
||||
* simulate failures but do not exercise important portion of the stack like transport and task management.
|
||||
*/
|
||||
public class CancelTests extends ReindexTestCase {
|
||||
|
||||
protected static final String INDEX = "reindex-cancel-index";
|
||||
protected static final String TYPE = "reindex-cancel-type";
|
||||
|
||||
private static final int MIN_OPERATIONS = 2;
|
||||
private static final int BLOCKING_OPERATIONS = 1;
|
||||
|
||||
// Semaphore used to allow & block indexing operations during the test
|
||||
private static final Semaphore ALLOWED_OPERATIONS = new Semaphore(0);
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(ReindexCancellationPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void clearAllowedOperations() {
|
||||
ALLOWED_OPERATIONS.drainPermits();
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the cancellation test
|
||||
*/
|
||||
private void testCancel(String action,
|
||||
AbstractBulkByScrollRequestBuilder<?, ? extends BulkIndexByScrollResponse, ?> builder,
|
||||
CancelAssertion assertion) throws Exception {
|
||||
|
||||
createIndex(INDEX);
|
||||
|
||||
// Total number of documents created for this test (~10 per primary shard)
|
||||
int numDocs = getNumShards(INDEX).numPrimaries * 10;
|
||||
ALLOWED_OPERATIONS.release(numDocs);
|
||||
|
||||
indexRandom(true, false, true, IntStream.range(0, numDocs)
|
||||
.mapToObj(i -> client().prepareIndex(INDEX, TYPE, String.valueOf(i)).setSource("n", i))
|
||||
.collect(Collectors.toList()));
|
||||
|
||||
// Checks that the all documents have been indexed and correctly counted
|
||||
assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), numDocs);
|
||||
assertThat(ALLOWED_OPERATIONS.drainPermits(), equalTo(0));
|
||||
|
||||
// Scroll 1 by 1 so that cancellation is easier to control
|
||||
builder.source().setSize(1);
|
||||
|
||||
// Allow a random number of the documents minus 1
|
||||
// to be modified by the reindex action
|
||||
int numModifiedDocs = randomIntBetween(MIN_OPERATIONS, numDocs);
|
||||
ALLOWED_OPERATIONS.release(numModifiedDocs - BLOCKING_OPERATIONS);
|
||||
|
||||
// Now execute the reindex action...
|
||||
ListenableActionFuture<? extends BulkIndexByScrollResponse> future = builder.execute();
|
||||
|
||||
// ... and waits for the indexing operation listeners to block
|
||||
awaitBusy(() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0);
|
||||
|
||||
// Status should show the task running
|
||||
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
BulkByScrollTask.Status status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertNull(status.getReasonCancelled());
|
||||
|
||||
// Cancel the request while the reindex action is blocked by the indexing operation listeners.
|
||||
// This will prevent further requests from being sent.
|
||||
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(action).get().getTasks();
|
||||
assertThat(cancelledTasks, hasSize(1));
|
||||
|
||||
// The status should now show canceled. The request will still be in the list because it is still blocked.
|
||||
tasksList = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
|
||||
|
||||
// Unblock the last operation
|
||||
ALLOWED_OPERATIONS.release(BLOCKING_OPERATIONS);
|
||||
|
||||
// Checks that no more operations are executed
|
||||
assertBusy(() -> ALLOWED_OPERATIONS.availablePermits() == 0 && ALLOWED_OPERATIONS.getQueueLength() == 0);
|
||||
|
||||
// And check the status of the response
|
||||
BulkIndexByScrollResponse response = future.get();
|
||||
assertThat(response.getReasonCancelled(), equalTo("by user request"));
|
||||
assertThat(response.getIndexingFailures(), emptyIterable());
|
||||
assertThat(response.getSearchFailures(), emptyIterable());
|
||||
|
||||
flushAndRefresh(INDEX);
|
||||
assertion.assertThat(response, numDocs, numModifiedDocs);
|
||||
}
|
||||
|
||||
public void testReindexCancel() throws Exception {
|
||||
testCancel(ReindexAction.NAME, reindex().source(INDEX).destination("dest", TYPE), (response, total, modified) -> {
|
||||
assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request")));
|
||||
|
||||
refresh("dest");
|
||||
assertHitCount(client().prepareSearch("dest").setTypes(TYPE).setSize(0).get(), modified);
|
||||
});
|
||||
}
|
||||
|
||||
public void testUpdateByQueryCancel() throws Exception {
|
||||
BytesReference pipeline = new BytesArray("{\n" +
|
||||
" \"description\" : \"sets updated to true\",\n" +
|
||||
" \"processors\" : [ {\n" +
|
||||
" \"set\" : {\n" +
|
||||
" \"field\": \"updated\",\n" +
|
||||
" \"value\": true" +
|
||||
" }\n" +
|
||||
" } ]\n" +
|
||||
"}");
|
||||
assertAcked(client().admin().cluster().preparePutPipeline("set-foo", pipeline).get());
|
||||
|
||||
testCancel(UpdateByQueryAction.NAME, updateByQuery().setPipeline("set-foo").source(INDEX), (response, total, modified) -> {
|
||||
assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request")));
|
||||
assertHitCount(client().prepareSearch(INDEX).setSize(0).setQuery(termQuery("updated", true)).get(), modified);
|
||||
});
|
||||
|
||||
assertAcked(client().admin().cluster().deletePipeline(new DeletePipelineRequest("set-foo")).get());
|
||||
}
|
||||
|
||||
public void testDeleteByQueryCancel() throws Exception {
|
||||
testCancel(DeleteByQueryAction.NAME, deleteByQuery().source(INDEX), (response, total, modified) -> {
|
||||
assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request")));
|
||||
assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), total - modified);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link CancelAssertion} is used to check the result of the cancel test.
|
||||
*/
|
||||
private interface CancelAssertion {
|
||||
void assertThat(BulkIndexByScrollResponse response, int total, int modified);
|
||||
}
|
||||
|
||||
public static class ReindexCancellationPlugin extends Plugin {
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "reindex-cancellation";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "See " + CancelTests.class.getName() + " documentation";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIndexModule(IndexModule indexModule) {
|
||||
indexModule.addIndexOperationListener(new BlockingDeleteListener());
|
||||
}
|
||||
}
|
||||
|
||||
public static class BlockingDeleteListener implements IndexingOperationListener {
|
||||
|
||||
@Override
|
||||
public Engine.Index preIndex(Engine.Index index) {
|
||||
return preCheck(index, index.type());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
return preCheck(delete, delete.type());
|
||||
}
|
||||
|
||||
private <T extends Engine.Operation> T preCheck(T operation, String type) {
|
||||
if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) {
|
||||
return operation;
|
||||
}
|
||||
|
||||
try {
|
||||
if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) {
|
||||
return operation;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
throw new IllegalStateException("Something went wrong");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,184 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.action.ListenableActionFuture;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexingOperationListener;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BrokenBarrierException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.empty;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
/**
|
||||
* Tests that you can actually cancel a delete-by-query request and all the plumbing works. Doesn't test all of the different cancellation
|
||||
* places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do
|
||||
* not exercise important portion of the stack like transport and task management.
|
||||
*/
|
||||
public class DeleteByQueryCancelTests extends ReindexTestCase {
|
||||
|
||||
private static final String INDEX = "test-delete-by-query";
|
||||
private static final String TYPE = "test";
|
||||
|
||||
private static final int MAX_DELETIONS = 10;
|
||||
private static final CyclicBarrier barrier = new CyclicBarrier(2);
|
||||
|
||||
@Override
|
||||
protected int numberOfShards() {
|
||||
// Only 1 shard and no replica so that test execution
|
||||
// can be easily controlled within a {@link IndexingOperationListener#preDelete}
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfReplicas() {
|
||||
// Only 1 shard and no replica so that test execution
|
||||
// can be easily controlled within a {@link IndexingOperationListener#preDelete}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
Collection<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
|
||||
plugins.add(DeleteByQueryCancellationPlugin.class);
|
||||
return plugins;
|
||||
}
|
||||
|
||||
public void testCancel() throws Exception {
|
||||
createIndex(INDEX);
|
||||
|
||||
int totalNumShards = getNumShards(INDEX).totalNumShards;
|
||||
|
||||
// Number of documents to be deleted in this test
|
||||
final int nbDocsToDelete = totalNumShards * MAX_DELETIONS;
|
||||
|
||||
// Total number of documents that will be created in this test
|
||||
final int nbDocs = nbDocsToDelete * randomIntBetween(1, 5);
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
indexRandom(false, client().prepareIndex(INDEX, TYPE, String.valueOf(i)).setSource("n", i));
|
||||
}
|
||||
|
||||
refresh(INDEX);
|
||||
assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), nbDocs);
|
||||
|
||||
// Executes the delete by query; each shard will block after MAX_DELETIONS
|
||||
DeleteByQueryRequestBuilder deleteByQuery = deleteByQuery().source("_all");
|
||||
deleteByQuery.source().setSize(1);
|
||||
|
||||
ListenableActionFuture<BulkIndexByScrollResponse> future = deleteByQuery.execute();
|
||||
|
||||
// Waits for the indexing operation listener to block
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Status should show running
|
||||
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks()
|
||||
.setActions(DeleteByQueryAction.NAME).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
BulkByScrollTask.Status status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertNull(status.getReasonCancelled());
|
||||
|
||||
// Cancel the request while the deletions are blocked. This will prevent further deletions requests from being sent.
|
||||
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks()
|
||||
.setActions(DeleteByQueryAction.NAME).get().getTasks();
|
||||
assertThat(cancelledTasks, hasSize(1));
|
||||
|
||||
// The status should now show canceled. The request will still be in the list because the script is still blocked.
|
||||
tasksList = client().admin().cluster().prepareListTasks().setActions(DeleteByQueryAction.NAME).setDetailed(true).get();
|
||||
assertThat(tasksList.getNodeFailures(), empty());
|
||||
assertThat(tasksList.getTaskFailures(), empty());
|
||||
assertThat(tasksList.getTasks(), hasSize(1));
|
||||
status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus();
|
||||
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
|
||||
|
||||
// Now unblock the listener so that it can proceed
|
||||
barrier.await();
|
||||
|
||||
// And check the status of the response
|
||||
BulkIndexByScrollResponse response = future.get();
|
||||
assertThat(response, matcher()
|
||||
.deleted(lessThanOrEqualTo((long) MAX_DELETIONS)).batches(MAX_DELETIONS).reasonCancelled(equalTo("by user request")));
|
||||
}
|
||||
|
||||
|
||||
public static class DeleteByQueryCancellationPlugin extends Plugin {
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "delete-by-query-cancellation";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "See " + DeleteByQueryCancellationPlugin.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onIndexModule(IndexModule indexModule) {
|
||||
indexModule.addIndexOperationListener(new BlockingDeleteListener());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link IndexingOperationListener} that allows a given number of documents to be deleted
|
||||
* and then blocks until it is notified to proceed.
|
||||
*/
|
||||
public static class BlockingDeleteListener implements IndexingOperationListener {
|
||||
|
||||
private final CountDown blockAfter = new CountDown(MAX_DELETIONS);
|
||||
|
||||
@Override
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
if (blockAfter.isCountedDown() || (TYPE.equals(delete.type()) == false)) {
|
||||
return delete;
|
||||
}
|
||||
|
||||
if (blockAfter.countDown()) {
|
||||
try {
|
||||
// Tell the test we've deleted enough documents.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
|
||||
// Wait for the test to tell us to proceed.
|
||||
barrier.await(30, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return delete;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,52 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Tests that you can actually cancel a reindex request and all the plumbing works. Doesn't test all of the different cancellation places -
|
||||
* that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do not
|
||||
* exercise important portion of the stack like transport and task management.
|
||||
*/
|
||||
public class ReindexCancelTests extends ReindexTestCase {
|
||||
public void testCancel() throws Exception {
|
||||
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME);
|
||||
|
||||
assertThat(response, matcher().created(1).reasonCancelled(equalTo("by user request")));
|
||||
refresh("dest");
|
||||
assertHitCount(client().prepareSearch("dest").setSize(0).get(), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfShards() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return CancelTestUtils.nodePlugins();
|
||||
}
|
||||
}
|
|
@ -1,53 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.reindex;
|
||||
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
/**
|
||||
* Tests that you can actually cancel an update-by-query request and all the plumbing works. Doesn't test all of the different cancellation
|
||||
* places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do
|
||||
* not exercise important portion of the stack like transport and task management.
|
||||
*/
|
||||
public class UpdateByQueryCancelTests extends ReindexTestCase {
|
||||
public void testCancel() throws Exception {
|
||||
BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, updateByQuery(), UpdateByQueryAction.NAME);
|
||||
|
||||
assertThat(response, matcher().updated(1).reasonCancelled(equalTo("by user request")));
|
||||
refresh("source");
|
||||
assertHitCount(client().prepareSearch("source").setSize(0).setQuery(matchQuery("giraffes", "giraffes")).get(), 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int numberOfShards() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return CancelTestUtils.nodePlugins();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue