From b0b503035ab35e8c2d2c4dcdc936e76b509c6ff3 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 20 May 2016 21:11:15 +0200 Subject: [PATCH] Update reindex cancel tests --- .../AbstractAsyncBulkByScrollAction.java | 11 +- .../reindex/UpdateByQueryRequestBuilder.java | 5 + .../reindex/AsyncBulkByScrollActionTests.java | 3 +- .../index/reindex/CancelTestUtils.java | 166 ------------ .../index/reindex/CancelTests.java | 240 ++++++++++++++++++ .../reindex/DeleteByQueryCancelTests.java | 184 -------------- .../index/reindex/ReindexCancelTests.java | 52 ---- .../reindex/UpdateByQueryCancelTests.java | 53 ---- 8 files changed, 254 insertions(+), 460 deletions(-) delete mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java create mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java delete mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryCancelTests.java delete mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexCancelTests.java delete mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryCancelTests.java diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index ee2f5484737..699c6340095 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -259,10 +259,6 @@ public abstract class AbstractAsyncBulkByScrollAction failures = new ArrayList(); Set destinationIndicesThisBatch = new HashSet<>(); @@ -291,6 +287,12 @@ public abstract class AbstractAsyncBulkByScrollAction action.onBulkResponse(null)); + cancelTaskCase((DummyAbstractAsyncBulkByScrollAction action) -> + action.onBulkResponse(new BulkResponse(new BulkItemResponse[0], 0))); } public void testCancelBeforeStartNextScroll() throws Exception { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java deleted file mode 100644 index fc6af19e6b3..00000000000 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.reindex; - -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; -import org.elasticsearch.index.reindex.BulkByScrollTask.Status; -import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.script.ExecutableScript; -import org.elasticsearch.script.NativeScriptFactory; -import org.elasticsearch.script.Script; -import org.elasticsearch.script.ScriptModule; -import org.elasticsearch.script.ScriptService.ScriptType; -import org.elasticsearch.test.ESIntegTestCase; - -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static java.util.Collections.emptyMap; -import static org.elasticsearch.test.ESIntegTestCase.client; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertThat; - -/** - * Utilities for testing reindex and update-by-query cancellation. This whole class isn't thread safe. Luckily we run our tests in separate - * jvms. - */ -public class CancelTestUtils { - public static Collection> nodePlugins() { - return Arrays.asList(ReindexPlugin.class, StickyScriptPlugin.class); - } - - private static final CyclicBarrier barrier = new CyclicBarrier(2); - - public static , - Builder extends AbstractBulkIndexByScrollRequestBuilder> - BulkIndexByScrollResponse testCancel(ESIntegTestCase test, Builder request, String actionToCancel) throws Exception { - - test.indexRandom(true, client().prepareIndex("source", "test", "1").setSource("foo", "a"), - client().prepareIndex("source", "test", "2").setSource("foo", "a")); - - request.source("source").script(new Script("sticky", ScriptType.INLINE, "native", emptyMap())); - request.source().setSize(1); - ListenableActionFuture response = request.execute(); - - // Wait until the script is on the first document. - barrier.await(30, TimeUnit.SECONDS); - - // Let just one document through. - barrier.await(30, TimeUnit.SECONDS); - - // Wait until the script is on the second document. - barrier.await(30, TimeUnit.SECONDS); - - // Status should show running - ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get(); - assertThat(tasksList.getNodeFailures(), empty()); - assertThat(tasksList.getTaskFailures(), empty()); - assertThat(tasksList.getTasks(), hasSize(1)); - BulkByScrollTask.Status status = (Status) tasksList.getTasks().get(0).getStatus(); - assertNull(status.getReasonCancelled()); - - // Cancel the request while the script is running. This will prevent the request from being sent at all. - List cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(actionToCancel).get().getTasks(); - assertThat(cancelledTasks, hasSize(1)); - - // The status should now show canceled. The request will still be in the list because the script is still blocked. - tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get(); - assertThat(tasksList.getNodeFailures(), empty()); - assertThat(tasksList.getTaskFailures(), empty()); - assertThat(tasksList.getTasks(), hasSize(1)); - status = (Status) tasksList.getTasks().get(0).getStatus(); - assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled()); - - // Now let the next document through. It won't be sent because the request is cancelled but we need to unblock the script. - barrier.await(); - - // Now we can just wait on the request and make sure it was actually cancelled half way through. - return response.get(); - } - - public static class StickyScriptPlugin extends Plugin { - @Override - public String name() { - return "sticky-script"; - } - - @Override - public String description() { - return "installs a script that \"sticks\" when it runs for testing reindex"; - } - - public void onModule(ScriptModule module) { - module.registerScript("sticky", StickyScriptFactory.class); - } - } - - public static class StickyScriptFactory implements NativeScriptFactory { - @Override - public ExecutableScript newScript(Map params) { - return new ExecutableScript() { - private Map source; - @Override - @SuppressWarnings("unchecked") // Safe because _ctx always has this shape - public void setNextVar(String name, Object value) { - if ("ctx".equals(name)) { - Map ctx = (Map) value; - source = (Map) ctx.get("_source"); - } else { - throw new IllegalArgumentException("Unexpected var: " + name); - } - } - - @Override - public Object run() { - try { - // Tell the test we've started a document. - barrier.await(30, TimeUnit.SECONDS); - - // Wait for the test to tell us to proceed. - barrier.await(30, TimeUnit.SECONDS); - - // Make some change to the source so that update-by-query tests can make sure only one document was changed. - source.put("giraffes", "giraffes"); - return null; - } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { - throw new RuntimeException(e); - } - } - }; - } - - @Override - public boolean needsScores() { - return false; - } - } -} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java new file mode 100644 index 00000000000..5a778d0b0fa --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -0,0 +1,240 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; +import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; +import org.elasticsearch.action.ingest.DeletePipelineRequest; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.Engine.Operation.Origin; +import org.elasticsearch.index.shard.IndexingOperationListener; +import org.elasticsearch.plugins.Plugin; +import org.junit.BeforeClass; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyIterable; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +/** + * Test that you can actually cancel a reindex/update-by-query/delete-by-query request and all the plumbing works. Doesn't test all of the + * different cancellation places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to + * simulate failures but do not exercise important portion of the stack like transport and task management. + */ +public class CancelTests extends ReindexTestCase { + + protected static final String INDEX = "reindex-cancel-index"; + protected static final String TYPE = "reindex-cancel-type"; + + private static final int MIN_OPERATIONS = 2; + private static final int BLOCKING_OPERATIONS = 1; + + // Semaphore used to allow & block indexing operations during the test + private static final Semaphore ALLOWED_OPERATIONS = new Semaphore(0); + + @Override + protected Collection> nodePlugins() { + Collection> plugins = new ArrayList<>(super.nodePlugins()); + plugins.add(ReindexCancellationPlugin.class); + return plugins; + } + + @BeforeClass + public static void clearAllowedOperations() { + ALLOWED_OPERATIONS.drainPermits(); + } + + /** + * Executes the cancellation test + */ + private void testCancel(String action, + AbstractBulkByScrollRequestBuilder builder, + CancelAssertion assertion) throws Exception { + + createIndex(INDEX); + + // Total number of documents created for this test (~10 per primary shard) + int numDocs = getNumShards(INDEX).numPrimaries * 10; + ALLOWED_OPERATIONS.release(numDocs); + + indexRandom(true, false, true, IntStream.range(0, numDocs) + .mapToObj(i -> client().prepareIndex(INDEX, TYPE, String.valueOf(i)).setSource("n", i)) + .collect(Collectors.toList())); + + // Checks that the all documents have been indexed and correctly counted + assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), numDocs); + assertThat(ALLOWED_OPERATIONS.drainPermits(), equalTo(0)); + + // Scroll 1 by 1 so that cancellation is easier to control + builder.source().setSize(1); + + // Allow a random number of the documents minus 1 + // to be modified by the reindex action + int numModifiedDocs = randomIntBetween(MIN_OPERATIONS, numDocs); + ALLOWED_OPERATIONS.release(numModifiedDocs - BLOCKING_OPERATIONS); + + // Now execute the reindex action... + ListenableActionFuture future = builder.execute(); + + // ... and waits for the indexing operation listeners to block + awaitBusy(() -> ALLOWED_OPERATIONS.hasQueuedThreads() && ALLOWED_OPERATIONS.availablePermits() == 0); + + // Status should show the task running + ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get(); + assertThat(tasksList.getNodeFailures(), empty()); + assertThat(tasksList.getTaskFailures(), empty()); + assertThat(tasksList.getTasks(), hasSize(1)); + BulkByScrollTask.Status status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus(); + assertNull(status.getReasonCancelled()); + + // Cancel the request while the reindex action is blocked by the indexing operation listeners. + // This will prevent further requests from being sent. + List cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(action).get().getTasks(); + assertThat(cancelledTasks, hasSize(1)); + + // The status should now show canceled. The request will still be in the list because it is still blocked. + tasksList = client().admin().cluster().prepareListTasks().setActions(action).setDetailed(true).get(); + assertThat(tasksList.getNodeFailures(), empty()); + assertThat(tasksList.getTaskFailures(), empty()); + assertThat(tasksList.getTasks(), hasSize(1)); + status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus(); + assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled()); + + // Unblock the last operation + ALLOWED_OPERATIONS.release(BLOCKING_OPERATIONS); + + // Checks that no more operations are executed + assertBusy(() -> ALLOWED_OPERATIONS.availablePermits() == 0 && ALLOWED_OPERATIONS.getQueueLength() == 0); + + // And check the status of the response + BulkIndexByScrollResponse response = future.get(); + assertThat(response.getReasonCancelled(), equalTo("by user request")); + assertThat(response.getIndexingFailures(), emptyIterable()); + assertThat(response.getSearchFailures(), emptyIterable()); + + flushAndRefresh(INDEX); + assertion.assertThat(response, numDocs, numModifiedDocs); + } + + public void testReindexCancel() throws Exception { + testCancel(ReindexAction.NAME, reindex().source(INDEX).destination("dest", TYPE), (response, total, modified) -> { + assertThat(response, matcher().created(modified).reasonCancelled(equalTo("by user request"))); + + refresh("dest"); + assertHitCount(client().prepareSearch("dest").setTypes(TYPE).setSize(0).get(), modified); + }); + } + + public void testUpdateByQueryCancel() throws Exception { + BytesReference pipeline = new BytesArray("{\n" + + " \"description\" : \"sets updated to true\",\n" + + " \"processors\" : [ {\n" + + " \"set\" : {\n" + + " \"field\": \"updated\",\n" + + " \"value\": true" + + " }\n" + + " } ]\n" + + "}"); + assertAcked(client().admin().cluster().preparePutPipeline("set-foo", pipeline).get()); + + testCancel(UpdateByQueryAction.NAME, updateByQuery().setPipeline("set-foo").source(INDEX), (response, total, modified) -> { + assertThat(response, matcher().updated(modified).reasonCancelled(equalTo("by user request"))); + assertHitCount(client().prepareSearch(INDEX).setSize(0).setQuery(termQuery("updated", true)).get(), modified); + }); + + assertAcked(client().admin().cluster().deletePipeline(new DeletePipelineRequest("set-foo")).get()); + } + + public void testDeleteByQueryCancel() throws Exception { + testCancel(DeleteByQueryAction.NAME, deleteByQuery().source(INDEX), (response, total, modified) -> { + assertThat(response, matcher().deleted(modified).reasonCancelled(equalTo("by user request"))); + assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), total - modified); + }); + } + + /** + * {@link CancelAssertion} is used to check the result of the cancel test. + */ + private interface CancelAssertion { + void assertThat(BulkIndexByScrollResponse response, int total, int modified); + } + + public static class ReindexCancellationPlugin extends Plugin { + + @Override + public String name() { + return "reindex-cancellation"; + } + + @Override + public String description() { + return "See " + CancelTests.class.getName() + " documentation"; + } + + @Override + public void onIndexModule(IndexModule indexModule) { + indexModule.addIndexOperationListener(new BlockingDeleteListener()); + } + } + + public static class BlockingDeleteListener implements IndexingOperationListener { + + @Override + public Engine.Index preIndex(Engine.Index index) { + return preCheck(index, index.type()); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + return preCheck(delete, delete.type()); + } + + private T preCheck(T operation, String type) { + if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { + return operation; + } + + try { + if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { + return operation; + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new IllegalStateException("Something went wrong"); + } + } +} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryCancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryCancelTests.java deleted file mode 100644 index 6007b646429..00000000000 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/DeleteByQueryCancelTests.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.reindex; - -import org.elasticsearch.action.ListenableActionFuture; -import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; -import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; -import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo; -import org.elasticsearch.common.util.concurrent.CountDown; -import org.elasticsearch.index.IndexModule; -import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.shard.IndexingOperationListener; -import org.elasticsearch.plugins.Plugin; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.empty; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.lessThanOrEqualTo; - -/** - * Tests that you can actually cancel a delete-by-query request and all the plumbing works. Doesn't test all of the different cancellation - * places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do - * not exercise important portion of the stack like transport and task management. - */ -public class DeleteByQueryCancelTests extends ReindexTestCase { - - private static final String INDEX = "test-delete-by-query"; - private static final String TYPE = "test"; - - private static final int MAX_DELETIONS = 10; - private static final CyclicBarrier barrier = new CyclicBarrier(2); - - @Override - protected int numberOfShards() { - // Only 1 shard and no replica so that test execution - // can be easily controlled within a {@link IndexingOperationListener#preDelete} - return 1; - } - - @Override - protected int numberOfReplicas() { - // Only 1 shard and no replica so that test execution - // can be easily controlled within a {@link IndexingOperationListener#preDelete} - return 0; - } - - @Override - protected Collection> nodePlugins() { - Collection> plugins = new ArrayList<>(super.nodePlugins()); - plugins.add(DeleteByQueryCancellationPlugin.class); - return plugins; - } - - public void testCancel() throws Exception { - createIndex(INDEX); - - int totalNumShards = getNumShards(INDEX).totalNumShards; - - // Number of documents to be deleted in this test - final int nbDocsToDelete = totalNumShards * MAX_DELETIONS; - - // Total number of documents that will be created in this test - final int nbDocs = nbDocsToDelete * randomIntBetween(1, 5); - for (int i = 0; i < nbDocs; i++) { - indexRandom(false, client().prepareIndex(INDEX, TYPE, String.valueOf(i)).setSource("n", i)); - } - - refresh(INDEX); - assertHitCount(client().prepareSearch(INDEX).setSize(0).get(), nbDocs); - - // Executes the delete by query; each shard will block after MAX_DELETIONS - DeleteByQueryRequestBuilder deleteByQuery = deleteByQuery().source("_all"); - deleteByQuery.source().setSize(1); - - ListenableActionFuture future = deleteByQuery.execute(); - - // Waits for the indexing operation listener to block - barrier.await(30, TimeUnit.SECONDS); - - // Status should show running - ListTasksResponse tasksList = client().admin().cluster().prepareListTasks() - .setActions(DeleteByQueryAction.NAME).setDetailed(true).get(); - assertThat(tasksList.getNodeFailures(), empty()); - assertThat(tasksList.getTaskFailures(), empty()); - assertThat(tasksList.getTasks(), hasSize(1)); - BulkByScrollTask.Status status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus(); - assertNull(status.getReasonCancelled()); - - // Cancel the request while the deletions are blocked. This will prevent further deletions requests from being sent. - List cancelledTasks = client().admin().cluster().prepareCancelTasks() - .setActions(DeleteByQueryAction.NAME).get().getTasks(); - assertThat(cancelledTasks, hasSize(1)); - - // The status should now show canceled. The request will still be in the list because the script is still blocked. - tasksList = client().admin().cluster().prepareListTasks().setActions(DeleteByQueryAction.NAME).setDetailed(true).get(); - assertThat(tasksList.getNodeFailures(), empty()); - assertThat(tasksList.getTaskFailures(), empty()); - assertThat(tasksList.getTasks(), hasSize(1)); - status = (BulkByScrollTask.Status) tasksList.getTasks().get(0).getStatus(); - assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled()); - - // Now unblock the listener so that it can proceed - barrier.await(); - - // And check the status of the response - BulkIndexByScrollResponse response = future.get(); - assertThat(response, matcher() - .deleted(lessThanOrEqualTo((long) MAX_DELETIONS)).batches(MAX_DELETIONS).reasonCancelled(equalTo("by user request"))); - } - - - public static class DeleteByQueryCancellationPlugin extends Plugin { - - @Override - public String name() { - return "delete-by-query-cancellation"; - } - - @Override - public String description() { - return "See " + DeleteByQueryCancellationPlugin.class.getName(); - } - - @Override - public void onIndexModule(IndexModule indexModule) { - indexModule.addIndexOperationListener(new BlockingDeleteListener()); - } - } - - /** - * A {@link IndexingOperationListener} that allows a given number of documents to be deleted - * and then blocks until it is notified to proceed. - */ - public static class BlockingDeleteListener implements IndexingOperationListener { - - private final CountDown blockAfter = new CountDown(MAX_DELETIONS); - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - if (blockAfter.isCountedDown() || (TYPE.equals(delete.type()) == false)) { - return delete; - } - - if (blockAfter.countDown()) { - try { - // Tell the test we've deleted enough documents. - barrier.await(30, TimeUnit.SECONDS); - - // Wait for the test to tell us to proceed. - barrier.await(30, TimeUnit.SECONDS); - } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { - throw new RuntimeException(e); - } - } - return delete; - } - } -} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexCancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexCancelTests.java deleted file mode 100644 index f9dde6045a6..00000000000 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexCancelTests.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.reindex; - -import org.elasticsearch.plugins.Plugin; - -import java.util.Collection; - -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; - -/** - * Tests that you can actually cancel a reindex request and all the plumbing works. Doesn't test all of the different cancellation places - - * that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do not - * exercise important portion of the stack like transport and task management. - */ -public class ReindexCancelTests extends ReindexTestCase { - public void testCancel() throws Exception { - BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, reindex().destination("dest", "test"), ReindexAction.NAME); - - assertThat(response, matcher().created(1).reasonCancelled(equalTo("by user request"))); - refresh("dest"); - assertHitCount(client().prepareSearch("dest").setSize(0).get(), 1); - } - - @Override - protected int numberOfShards() { - return 1; - } - - @Override - protected Collection> nodePlugins() { - return CancelTestUtils.nodePlugins(); - } -} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryCancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryCancelTests.java deleted file mode 100644 index 4cb859c0017..00000000000 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/UpdateByQueryCancelTests.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.index.reindex; - -import org.elasticsearch.plugins.Plugin; - -import java.util.Collection; - -import static org.elasticsearch.index.query.QueryBuilders.matchQuery; -import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; - -/** - * Tests that you can actually cancel an update-by-query request and all the plumbing works. Doesn't test all of the different cancellation - * places - that is the responsibility of {@link AsyncBulkByScrollActionTests} which have more precise control to simulate failures but do - * not exercise important portion of the stack like transport and task management. - */ -public class UpdateByQueryCancelTests extends ReindexTestCase { - public void testCancel() throws Exception { - BulkIndexByScrollResponse response = CancelTestUtils.testCancel(this, updateByQuery(), UpdateByQueryAction.NAME); - - assertThat(response, matcher().updated(1).reasonCancelled(equalTo("by user request"))); - refresh("source"); - assertHitCount(client().prepareSearch("source").setSize(0).setQuery(matchQuery("giraffes", "giraffes")).get(), 1); - } - - @Override - protected int numberOfShards() { - return 1; - } - - @Override - protected Collection> nodePlugins() { - return CancelTestUtils.nodePlugins(); - } -}