From 92bfd16c5824885ca673c5bb99acac9148c18693 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 3 May 2017 11:20:53 +0200 Subject: [PATCH] Java api: ActionRequestBuilder#execute to return a PlainActionFuture (#24415) This change makes the request builder code-path same as `Client#execute`. The request builder used to return a `ListenableActionFuture` when calling execute, which allows to associate listeners with the returned future. For async execution though it is recommended to use the `execute` method that accepts an `ActionListener`, like users would do when using `Client#execute`. Relates to #24412 Relates to #9201 --- .../action/ActionRequestBuilder.java | 23 ++--- .../admin/cluster/node/tasks/TasksIT.java | 87 +++++++++---------- .../indices/stats/IndicesStatsTests.java | 4 +- .../support/ActiveShardsObserverIT.java | 4 +- .../client/AbstractClientHeadersTestCase.java | 34 ++++---- .../index/WaitUntilRefreshIT.java | 4 +- .../search/SearchCancellationIT.java | 12 +-- .../DedicatedClusterSnapshotRestoreIT.java | 4 +- .../MinThreadsSnapshotRestoreIT.java | 10 +-- .../SharedClusterSnapshotRestoreIT.java | 15 ++-- .../migration/migrate_6_0/java.asciidoc | 12 ++- .../index/reindex/CancelTests.java | 4 +- .../index/reindex/RethrottleTests.java | 9 +- .../index/reindex/RetryTests.java | 6 +- 14 files changed, 108 insertions(+), 120 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java index 076d4ae67f6..964568fc472 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java +++ b/core/src/main/java/org/elasticsearch/action/ActionRequestBuilder.java @@ -19,18 +19,16 @@ package org.elasticsearch.action; -import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.threadpool.ThreadPool; import java.util.Objects; -public abstract class ActionRequestBuilder> { +public abstract class ActionRequestBuilder> { protected final Action action; protected final Request request; - private final ThreadPool threadPool; protected final ElasticsearchClient client; protected ActionRequestBuilder(ElasticsearchClient client, Action action, Request request) { @@ -38,18 +36,14 @@ public abstract class ActionRequestBuilder execute() { - PlainListenableActionFuture future = new PlainListenableActionFuture<>(threadPool); - execute(future); - return future; + public ActionFuture execute() { + return client.execute(action, request); } /** @@ -74,13 +68,6 @@ public abstract class ActionRequestBuilder listener) { - client.execute(action, beforeExecute(request), listener); - } - - /** - * A callback to additionally process the request before its executed - */ - protected Request beforeExecute(Request request) { - return request; + client.execute(action, request, listener); } } diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java index 48df23ea3ae..a2cab6b85ab 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java @@ -21,9 +21,9 @@ package org.elasticsearch.action.admin.cluster.node.tasks; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.TaskOperationFailure; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -90,7 +90,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.emptyCollectionOf; @@ -466,8 +465,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksCancellation() throws Exception { // Start blocking test task // Get real client (the plugin is not registered on transport nodes) - ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); logger.info("--> started test tasks"); // Wait for the task to start on all nodes @@ -488,8 +486,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksUnblocking() throws Exception { // Start blocking test task - ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); // Wait for the task to start on all nodes assertBusy(() -> assertEquals(internalCluster().size(), client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size())); @@ -502,42 +499,45 @@ public class TasksIT extends ESIntegTestCase { } public void testListTasksWaitForCompletion() throws Exception { - waitForCompletionTestCase(randomBoolean(), id -> { - return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME) - .setWaitForCompletion(true).execute(); - }, response -> { - assertThat(response.getNodeFailures(), empty()); - assertThat(response.getTaskFailures(), empty()); - assertThat(response.getTasks(), hasSize(1)); - TaskInfo task = response.getTasks().get(0); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction()); - }); + waitForCompletionTestCase(randomBoolean(), + id -> client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME) + .setWaitForCompletion(true).execute(), + response -> { + assertThat(response.getNodeFailures(), empty()); + assertThat(response.getTaskFailures(), empty()); + assertThat(response.getTasks(), hasSize(1)); + TaskInfo task = response.getTasks().get(0); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction()); + } + ); } public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception { - waitForCompletionTestCase(false, id -> { - return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(); - }, response -> { - assertTrue(response.getTask().isCompleted()); - // We didn't store the result so it won't come back when we wait - assertNull(response.getTask().getResponse()); - // But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete. - assertNotNull(response.getTask().getTask()); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); - }); + waitForCompletionTestCase(false, + id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(), + response -> { + assertTrue(response.getTask().isCompleted()); + //We didn't store the result so it won't come back when we wait + assertNull(response.getTask().getResponse()); + //But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete + assertNotNull(response.getTask().getTask()); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); + } + ); } public void testGetTaskWaitForCompletionWithStoringResult() throws Exception { - waitForCompletionTestCase(true, id -> { - return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(); - }, response -> { - assertTrue(response.getTask().isCompleted()); - // We stored the task so we should get its results - assertEquals(0, response.getTask().getResponseAsMap().get("failure_count")); - // The task's details should also be there - assertNotNull(response.getTask().getTask()); - assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); - }); + waitForCompletionTestCase(true, + id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(), + response -> { + assertTrue(response.getTask().isCompleted()); + // We stored the task so we should get its results + assertEquals(0, response.getTask().getResponseAsMap().get("failure_count")); + // The task's details should also be there + assertNotNull(response.getTask().getTask()); + assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction()); + } + ); } /** @@ -546,13 +546,13 @@ public class TasksIT extends ESIntegTestCase { * @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it. * @param validator validate the response and return the task ids that were found */ - private void waitForCompletionTestCase(boolean storeResult, Function> wait, Consumer validator) + private void waitForCompletionTestCase(boolean storeResult, Function> wait, Consumer validator) throws Exception { // Start blocking test task - ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) + ActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) .setShouldStoreResult(storeResult).execute(); - ListenableActionFuture waitResponseFuture; + ActionFuture waitResponseFuture; TaskId taskId; try { taskId = waitForTestTaskStartOnAllNodes(); @@ -623,8 +623,7 @@ public class TasksIT extends ESIntegTestCase { */ private void waitForTimeoutTestCase(Function> wait) throws Exception { // Start blocking test task - ListenableActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()) - .execute(); + ActionFuture future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute(); try { TaskId taskId = waitForTestTaskStartOnAllNodes(); @@ -662,7 +661,7 @@ public class TasksIT extends ESIntegTestCase { public void testTasksListWaitForNoTask() throws Exception { // Spin up a request to wait for no matching tasks - ListenableActionFuture waitResponseFuture = client().admin().cluster().prepareListTasks() + ActionFuture waitResponseFuture = client().admin().cluster().prepareListTasks() .setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10)) .execute(); @@ -672,12 +671,12 @@ public class TasksIT extends ESIntegTestCase { public void testTasksGetWaitForNoTask() throws Exception { // Spin up a request to wait for no matching tasks - ListenableActionFuture waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1") + ActionFuture waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1") .setWaitForCompletion(true).setTimeout(timeValueMillis(10)) .execute(); // It should finish quickly and without complaint - expectNotFound(() -> waitResponseFuture.get()); + expectNotFound(waitResponseFuture::get); } public void testTasksWaitForAllTask() throws Exception { diff --git a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java index 58bf897bd39..6c1f44ea69a 100644 --- a/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java +++ b/core/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.admin.indices.stats; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ShardOperationFailedException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; @@ -124,7 +124,7 @@ public class IndicesStatsTests extends ESSingleNodeTestCase { createIndex("test", Settings.builder().put("refresh_interval", -1).build()); // Index a document asynchronously so the request will only return when document is refreshed - ListenableActionFuture index = client().prepareIndex("test", "test", "test").setSource("test", "test") + ActionFuture index = client().prepareIndex("test", "test", "test").setSource("test", "test") .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute(); // Wait for the refresh listener to appear in the stats diff --git a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java index 5f52293d7ee..f3611663b42 100644 --- a/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java +++ b/core/src/test/java/org/elasticsearch/action/support/ActiveShardsObserverIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.action.support; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; @@ -140,7 +140,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase { .build(); logger.info("--> start the index creation process"); - ListenableActionFuture responseListener = + ActionFuture responseListener = prepareCreate(indexName) .setSettings(settings) .setWaitForActiveShards(ActiveShardCount.ALL) diff --git a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java index 43d866a47b7..67318b7b21f 100644 --- a/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java +++ b/core/src/test/java/org/elasticsearch/client/AbstractClientHeadersTestCase.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.GenericAction; import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction; import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction; +import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.flush.FlushAction; @@ -32,7 +33,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; import org.elasticsearch.action.delete.DeleteAction; import org.elasticsearch.action.get.GetAction; import org.elasticsearch.action.index.IndexAction; -import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -101,22 +101,22 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { // validation in the settings??? - ugly and conceptually wrong) // choosing arbitrary top level actions to test - client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); - client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); - client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); - client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); - client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); + client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool())); + client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool())); + client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool())); + client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool())); + client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool())); // choosing arbitrary cluster admin actions to test - client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); - client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); - client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); + client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool())); + client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool())); + client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool())); // choosing arbitrary indices admin actions to test - client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); - client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); - client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); - client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); + client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool())); + client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool())); + client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool())); + client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool())); } public void testOverrideHeader() throws Exception { @@ -126,13 +126,13 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { expected.put("key2", "val 2"); client.threadPool().getThreadContext().putHeader("key1", key1Val); client.prepareGet("idx", "type", "id") - .execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool())); client.admin().cluster().prepareClusterStats() - .execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool())); client.admin().indices().prepareCreate("idx") - .execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool())); + .execute(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool())); } protected static void assertHeaders(Map headers, Map expected) { @@ -205,7 +205,5 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase { } return result; } - } - } diff --git a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java index ca089e6eb83..cad590b3c8d 100644 --- a/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java +++ b/core/src/test/java/org/elasticsearch/index/WaitUntilRefreshIT.java @@ -19,8 +19,8 @@ package org.elasticsearch.index; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.DocWriteResponse; -import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -149,7 +149,7 @@ public class WaitUntilRefreshIT extends ESIntegTestCase { */ public void testNoRefreshInterval() throws InterruptedException, ExecutionException { client().admin().indices().prepareUpdateSettings("test").setSettings(singletonMap("index.refresh_interval", -1)).get(); - ListenableActionFuture index = client().prepareIndex("test", "index", "1").setSource("foo", "bar") + ActionFuture index = client().prepareIndex("test", "index", "1").setSource("foo", "bar") .setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute(); while (false == index.isDone()) { client().admin().indices().prepareRefresh("test").get(); diff --git a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java index 7366834391b..02607f0c1fd 100644 --- a/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java +++ b/core/src/test/java/org/elasticsearch/search/SearchCancellationIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.search; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -128,7 +128,7 @@ public class SearchCancellationIT extends ESIntegTestCase { assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId())); } - private SearchResponse ensureSearchWasCancelled(ListenableActionFuture searchResponse) { + private SearchResponse ensureSearchWasCancelled(ActionFuture searchResponse) { try { SearchResponse response = searchResponse.actionGet(); logger.info("Search response {}", response); @@ -146,7 +146,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture searchResponse = client().prepareSearch("test").setQuery( + ActionFuture searchResponse = client().prepareSearch("test").setQuery( scriptQuery(new Script( ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap()))) .execute(); @@ -164,7 +164,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture searchResponse = client().prepareSearch("test") + ActionFuture searchResponse = client().prepareSearch("test") .addScriptField("test_field", new Script(ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap()) ).execute(); @@ -182,7 +182,7 @@ public class SearchCancellationIT extends ESIntegTestCase { indexTestData(); logger.info("Executing search"); - ListenableActionFuture searchResponse = client().prepareSearch("test") + ActionFuture searchResponse = client().prepareSearch("test") .setScroll(TimeValue.timeValueSeconds(10)) .setSize(5) .setQuery( @@ -230,7 +230,7 @@ public class SearchCancellationIT extends ESIntegTestCase { String scrollId = searchResponse.getScrollId(); logger.info("Executing scroll with id {}", scrollId); - ListenableActionFuture scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) + ActionFuture scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) .setScroll(keepAlive).execute(); awaitForBlock(plugins); diff --git a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 2503d6e157e..6c34cdf2e45 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -21,7 +21,7 @@ package org.elasticsearch.snapshots; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.hppc.IntSet; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; @@ -412,7 +412,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode); - ListenableActionFuture deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute(); + ActionFuture deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute(); // Make sure that abort makes some progress Thread.sleep(100); unblockNode("test-repo", blockedNode); diff --git a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java index 680bfea6b3b..29657c5fb8b 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/MinThreadsSnapshotRestoreIT.java @@ -19,7 +19,7 @@ package org.elasticsearch.snapshots; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -83,7 +83,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of first snapshot"); - ListenableActionFuture future = + ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); @@ -129,8 +129,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of snapshot"); - ListenableActionFuture future = - client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); + ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); @@ -185,8 +184,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase { String blockedNode = internalCluster().getMasterName(); ((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true); logger.info("--> start deletion of snapshot"); - ListenableActionFuture future = - client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); + ActionFuture future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute(); logger.info("--> waiting for block to kick in on node [{}]", blockedNode); waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 11194a689da..67911f3e5b0 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -22,7 +22,7 @@ package org.elasticsearch.snapshots; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; @@ -56,7 +56,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; @@ -155,7 +154,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertHitCount(client.prepareSearch("test-idx-2").setSize(0).get(), 100L); assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 100L); - ListenableActionFuture flushResponseFuture = null; + ActionFuture flushResponseFuture = null; if (randomBoolean()) { ArrayList indicesToFlush = new ArrayList<>(); for (int i = 1; i < 4; i++) { @@ -888,7 +887,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> delete index"); cluster().wipeIndices("test-idx"); logger.info("--> restore index after deletion"); - ListenableActionFuture restoreSnapshotResponseFuture = + ActionFuture restoreSnapshotResponseFuture = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute(); logger.info("--> wait for the index to appear"); @@ -2014,7 +2013,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L)); logger.info("--> snapshot allow partial {}", allowPartial); - ListenableActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") + ActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); logger.info("--> wait for block to kick in"); if (initBlocking) { @@ -2109,7 +2108,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas blockAllDataNodes("test-repo"); logger.info("--> execution will be blocked on all data nodes"); - final ListenableActionFuture restoreFut; + final ActionFuture restoreFut; try { logger.info("--> start restore"); restoreFut = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap") @@ -2174,7 +2173,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> execution will be blocked on all data nodes"); blockAllDataNodes(repoName); - final ListenableActionFuture restoreFut; + final ActionFuture restoreFut; try { logger.info("--> start restore"); restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName) @@ -2461,7 +2460,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas // take initial snapshot with a block, making sure we only get 1 in-progress snapshot returned // block a node so the create snapshot operation can remain in progress final String initialBlockedNode = blockNodeWithIndex(repositoryName, indexName); - ListenableActionFuture responseListener = + ActionFuture responseListener = client.admin().cluster().prepareCreateSnapshot(repositoryName, "snap-on-empty-repo") .setWaitForCompletion(false) .setIndices(indexName) diff --git a/docs/reference/migration/migrate_6_0/java.asciidoc b/docs/reference/migration/migrate_6_0/java.asciidoc index a8954732657..b82a46501e2 100644 --- a/docs/reference/migration/migrate_6_0/java.asciidoc +++ b/docs/reference/migration/migrate_6_0/java.asciidoc @@ -8,14 +8,22 @@ an object source did not require the XContentType to be specified. The auto-dete type is no longer used, so these methods now require the XContentType as an additional argument when providing the source in bytes or as a string. -=== `DeleteByQueryRequest` requires an explicitly set query +==== `DeleteByQueryRequest` requires an explicitly set query In previous versions of Elasticsearch, delete by query requests without an explicit query were accepted, match_all was used as the default query and all documents were deleted as a result. From version 6.0.0, a `DeleteByQueryRequest` requires an explicit query be set. -=== `InternalStats` and `Stats` getCountAsString() method removed +==== `InternalStats` and `Stats` getCountAsString() method removed The `count` value in the stats aggregation represents a doc count that shouldnn't require a formatted version. This method was deprecated in 5.4 in favour of just using `String.valueOf(getCount())` if needed + +==== `ActionRequestBuilder#execute` returns `ActionFuture` rather than `ListenableActionFuture` + +When sending a request through the request builders e.g. client.prepareSearch().execute(), it used to +be possible to call `addListener` against the returned `ListenableActionFuture`. With this change an +`ActionFuture` is returned instead, which is consistent with what the `Client` methods return, hence +it is not possible to associate the future with listeners. The `execute` method that accept a listener +as an argument can be used instead. \ No newline at end of file diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index 436ebec439b..3aa35fb6e19 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder; @@ -111,7 +111,7 @@ public class CancelTests extends ReindexTestCase { ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices()); // Now execute the reindex action... - ListenableActionFuture future = builder.execute(); + ActionFuture future = builder.execute(); /* ... and waits for the indexing operation listeners to block. It is important to realize that some of the workers might have * exhausted their slice while others might have quite a bit left to work on. We can't control that. */ diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java index 91c184e16a6..f40afce9a4e 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RethrottleTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup; import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder; @@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskId; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.allOf; @@ -87,7 +88,7 @@ public class RethrottleTests extends ReindexTestCase { // Start a request that will never finish unless we rethrottle it request.setRequestsPerSecond(.000001f); // Throttle "forever" request.source().setSize(1); // Make sure we use multiple batches - ListenableActionFuture responseListener = request.execute(); + ActionFuture responseListener = request.execute(); TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, request.request().getSlices()); TaskId taskToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId(); @@ -102,7 +103,7 @@ public class RethrottleTests extends ReindexTestCase { assertBusy(() -> { BulkByScrollTask.Status parent = (BulkByScrollTask.Status) client().admin().cluster().prepareGetTask(taskToRethrottle).get() .getTask().getTask().getStatus(); - long finishedSubTasks = parent.getSliceStatuses().stream().filter(s -> s != null).count(); + long finishedSubTasks = parent.getSliceStatuses().stream().filter(Objects::nonNull).count(); ListTasksResponse list = client().admin().cluster().prepareListTasks().setParentTaskId(taskToRethrottle).get(); list.rethrowFailures("subtasks"); assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) request.request().getSlices())); @@ -124,7 +125,7 @@ public class RethrottleTests extends ReindexTestCase { /* Check that at least one slice was rethrottled. We won't always rethrottle all of them because they might have completed. * With multiple slices these numbers might not add up perfectly, thus the 1.01F. */ long unfinished = status.getSliceStatuses().stream() - .filter(slice -> slice != null) + .filter(Objects::nonNull) .filter(slice -> slice.getStatus().getTotal() > slice.getStatus().getSuccessfullyProcessed()) .count(); float maxExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ? diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java index be22d28daa9..86fbd47fd96 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/RetryTests.java @@ -19,7 +19,7 @@ package org.elasticsearch.index.reindex; -import org.elasticsearch.action.ListenableActionFuture; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.bulk.BackoffPolicy; @@ -30,13 +30,11 @@ import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse; import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask; import org.elasticsearch.action.bulk.byscroll.BulkIndexByScrollResponseMatcher; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; -import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.remote.RemoteInfo; import org.elasticsearch.plugins.Plugin; @@ -151,7 +149,7 @@ public class RetryTests extends ESSingleNodeTestCase { request.source().setSize(DOC_COUNT / randomIntBetween(2, 10)); logger.info("Starting request"); - ListenableActionFuture responseListener = request.execute(); + ActionFuture responseListener = request.execute(); try { logger.info("Waiting for search rejections on the initial search");