Java api: ActionRequestBuilder#execute to return a PlainActionFuture (#24415)

This change makes the request builder code-path same as `Client#execute`. The request builder used to return a `ListenableActionFuture` when calling execute, which allows to associate listeners with the returned future. For async execution though it is recommended to use the `execute` method that accepts an `ActionListener`, like users would do when using `Client#execute`.

Relates to #24412
Relates to #9201
This commit is contained in:
Luca Cavanna 2017-05-03 11:20:53 +02:00 committed by GitHub
parent be19ccef57
commit 92bfd16c58
14 changed files with 108 additions and 120 deletions

View File

@ -19,18 +19,16 @@
package org.elasticsearch.action;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Objects;
public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> {
public abstract class ActionRequestBuilder<Request extends ActionRequest, Response extends ActionResponse,
RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> {
protected final Action<Request, Response, RequestBuilder> action;
protected final Request request;
private final ThreadPool threadPool;
protected final ElasticsearchClient client;
protected ActionRequestBuilder(ElasticsearchClient client, Action<Request, Response, RequestBuilder> action, Request request) {
@ -38,18 +36,14 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
this.action = action;
this.request = request;
this.client = client;
threadPool = client.threadPool();
}
public Request request() {
return this.request;
}
public ListenableActionFuture<Response> execute() {
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
execute(future);
return future;
public ActionFuture<Response> execute() {
return client.execute(action, request);
}
/**
@ -74,13 +68,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
}
public void execute(ActionListener<Response> listener) {
client.execute(action, beforeExecute(request), listener);
}
/**
* A callback to additionally process the request before its executed
*/
protected Request beforeExecute(Request request) {
return request;
client.execute(action, request, listener);
}
}

View File

@ -21,9 +21,9 @@ package org.elasticsearch.action.admin.cluster.node.tasks;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.TaskOperationFailure;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
@ -90,7 +90,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchResponse;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.emptyCollectionOf;
@ -466,8 +465,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksCancellation() throws Exception {
// Start blocking test task
// Get real client (the plugin is not registered on transport nodes)
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
logger.info("--> started test tasks");
// Wait for the task to start on all nodes
@ -488,8 +486,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksUnblocking() throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
// Wait for the task to start on all nodes
assertBusy(() -> assertEquals(internalCluster().size(),
client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").get().getTasks().size()));
@ -502,42 +499,45 @@ public class TasksIT extends ESIntegTestCase {
}
public void testListTasksWaitForCompletion() throws Exception {
waitForCompletionTestCase(randomBoolean(), id -> {
return client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME)
.setWaitForCompletion(true).execute();
}, response -> {
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getTasks(), hasSize(1));
TaskInfo task = response.getTasks().get(0);
assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
});
waitForCompletionTestCase(randomBoolean(),
id -> client().admin().cluster().prepareListTasks().setActions(TestTaskPlugin.TestTaskAction.NAME)
.setWaitForCompletion(true).execute(),
response -> {
assertThat(response.getNodeFailures(), empty());
assertThat(response.getTaskFailures(), empty());
assertThat(response.getTasks(), hasSize(1));
TaskInfo task = response.getTasks().get(0);
assertEquals(TestTaskPlugin.TestTaskAction.NAME, task.getAction());
}
);
}
public void testGetTaskWaitForCompletionWithoutStoringResult() throws Exception {
waitForCompletionTestCase(false, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertTrue(response.getTask().isCompleted());
// We didn't store the result so it won't come back when we wait
assertNull(response.getTask().getResponse());
// But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete.
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
waitForCompletionTestCase(false,
id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(),
response -> {
assertTrue(response.getTask().isCompleted());
//We didn't store the result so it won't come back when we wait
assertNull(response.getTask().getResponse());
//But the task's details should still be there because we grabbed a reference to the task before waiting for it to complete
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
}
);
}
public void testGetTaskWaitForCompletionWithStoringResult() throws Exception {
waitForCompletionTestCase(true, id -> {
return client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute();
}, response -> {
assertTrue(response.getTask().isCompleted());
// We stored the task so we should get its results
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
// The task's details should also be there
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
});
waitForCompletionTestCase(true,
id -> client().admin().cluster().prepareGetTask(id).setWaitForCompletion(true).execute(),
response -> {
assertTrue(response.getTask().isCompleted());
// We stored the task so we should get its results
assertEquals(0, response.getTask().getResponseAsMap().get("failure_count"));
// The task's details should also be there
assertNotNull(response.getTask().getTask());
assertEquals(TestTaskPlugin.TestTaskAction.NAME, response.getTask().getTask().getAction());
}
);
}
/**
@ -546,13 +546,13 @@ public class TasksIT extends ESIntegTestCase {
* @param wait start waiting for a task. Accepts that id of the task to wait for and returns a future waiting for it.
* @param validator validate the response and return the task ids that were found
*/
private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ListenableActionFuture<T>> wait, Consumer<T> validator)
private <T> void waitForCompletionTestCase(boolean storeResult, Function<TaskId, ActionFuture<T>> wait, Consumer<T> validator)
throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.setShouldStoreResult(storeResult).execute();
ListenableActionFuture<T> waitResponseFuture;
ActionFuture<T> waitResponseFuture;
TaskId taskId;
try {
taskId = waitForTestTaskStartOnAllNodes();
@ -623,8 +623,7 @@ public class TasksIT extends ESIntegTestCase {
*/
private void waitForTimeoutTestCase(Function<TaskId, ? extends Iterable<? extends Throwable>> wait) throws Exception {
// Start blocking test task
ListenableActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client())
.execute();
ActionFuture<TestTaskPlugin.NodesResponse> future = TestTaskPlugin.TestTaskAction.INSTANCE.newRequestBuilder(client()).execute();
try {
TaskId taskId = waitForTestTaskStartOnAllNodes();
@ -662,7 +661,7 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksListWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
ListenableActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
ActionFuture<ListTasksResponse> waitResponseFuture = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "[n]").setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
@ -672,12 +671,12 @@ public class TasksIT extends ESIntegTestCase {
public void testTasksGetWaitForNoTask() throws Exception {
// Spin up a request to wait for no matching tasks
ListenableActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1")
ActionFuture<GetTaskResponse> waitResponseFuture = client().admin().cluster().prepareGetTask("notfound:1")
.setWaitForCompletion(true).setTimeout(timeValueMillis(10))
.execute();
// It should finish quickly and without complaint
expectNotFound(() -> waitResponseFuture.get());
expectNotFound(waitResponseFuture::get);
}
public void testTasksWaitForAllTask() throws Exception {

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.admin.indices.stats;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
@ -124,7 +124,7 @@ public class IndicesStatsTests extends ESSingleNodeTestCase {
createIndex("test", Settings.builder().put("refresh_interval", -1).build());
// Index a document asynchronously so the request will only return when document is refreshed
ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
ActionFuture<IndexResponse> index = client().prepareIndex("test", "test", "test").setSource("test", "test")
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
// Wait for the refresh listener to appear in the stats

View File

@ -19,7 +19,7 @@
package org.elasticsearch.action.support;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
@ -140,7 +140,7 @@ public class ActiveShardsObserverIT extends ESIntegTestCase {
.build();
logger.info("--> start the index creation process");
ListenableActionFuture<CreateIndexResponse> responseListener =
ActionFuture<CreateIndexResponse> responseListener =
prepareCreate(indexName)
.setSettings(settings)
.setWaitForActiveShards(ActiveShardCount.ALL)

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.stats.ClusterStatsAction;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.flush.FlushAction;
@ -32,7 +33,6 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
@ -101,22 +101,22 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
// validation in the settings??? - ugly and conceptually wrong)
// choosing arbitrary top level actions to test
client.prepareGet("idx", "type", "id").execute().addListener(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
client.prepareSearch().execute().addListener(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
client.prepareDelete("idx", "type", "id").execute().addListener(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute().addListener(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute().addListener(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
client.prepareGet("idx", "type", "id").execute(new AssertingActionListener<>(GetAction.NAME, client.threadPool()));
client.prepareSearch().execute(new AssertingActionListener<>(SearchAction.NAME, client.threadPool()));
client.prepareDelete("idx", "type", "id").execute(new AssertingActionListener<>(DeleteAction.NAME, client.threadPool()));
client.admin().cluster().prepareDeleteStoredScript("lang", "id").execute(new AssertingActionListener<>(DeleteStoredScriptAction.NAME, client.threadPool()));
client.prepareIndex("idx", "type", "id").setSource("source", XContentType.JSON).execute(new AssertingActionListener<>(IndexAction.NAME, client.threadPool()));
// choosing arbitrary cluster admin actions to test
client.admin().cluster().prepareClusterStats().execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute().addListener(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
client.admin().cluster().prepareReroute().execute().addListener(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
client.admin().cluster().prepareClusterStats().execute(new AssertingActionListener<>(ClusterStatsAction.NAME, client.threadPool()));
client.admin().cluster().prepareCreateSnapshot("repo", "bck").execute(new AssertingActionListener<>(CreateSnapshotAction.NAME, client.threadPool()));
client.admin().cluster().prepareReroute().execute(new AssertingActionListener<>(ClusterRerouteAction.NAME, client.threadPool()));
// choosing arbitrary indices admin actions to test
client.admin().indices().prepareCreate("idx").execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
client.admin().indices().prepareStats().execute().addListener(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
client.admin().indices().prepareClearCache("idx1", "idx2").execute().addListener(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
client.admin().indices().prepareFlush().execute().addListener(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
client.admin().indices().prepareCreate("idx").execute(new AssertingActionListener<>(CreateIndexAction.NAME, client.threadPool()));
client.admin().indices().prepareStats().execute(new AssertingActionListener<>(IndicesStatsAction.NAME, client.threadPool()));
client.admin().indices().prepareClearCache("idx1", "idx2").execute(new AssertingActionListener<>(ClearIndicesCacheAction.NAME, client.threadPool()));
client.admin().indices().prepareFlush().execute(new AssertingActionListener<>(FlushAction.NAME, client.threadPool()));
}
public void testOverrideHeader() throws Exception {
@ -126,13 +126,13 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
expected.put("key2", "val 2");
client.threadPool().getThreadContext().putHeader("key1", key1Val);
client.prepareGet("idx", "type", "id")
.execute().addListener(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool()));
.execute(new AssertingActionListener<>(GetAction.NAME, expected, client.threadPool()));
client.admin().cluster().prepareClusterStats()
.execute().addListener(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool()));
.execute(new AssertingActionListener<>(ClusterStatsAction.NAME, expected, client.threadPool()));
client.admin().indices().prepareCreate("idx")
.execute().addListener(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool()));
.execute(new AssertingActionListener<>(CreateIndexAction.NAME, expected, client.threadPool()));
}
protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
@ -205,7 +205,5 @@ public abstract class AbstractClientHeadersTestCase extends ESTestCase {
}
return result;
}
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.index;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -149,7 +149,7 @@ public class WaitUntilRefreshIT extends ESIntegTestCase {
*/
public void testNoRefreshInterval() throws InterruptedException, ExecutionException {
client().admin().indices().prepareUpdateSettings("test").setSettings(singletonMap("index.refresh_interval", -1)).get();
ListenableActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar")
ActionFuture<IndexResponse> index = client().prepareIndex("test", "index", "1").setSource("foo", "bar")
.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL).execute();
while (false == index.isDone()) {
client().admin().indices().prepareRefresh("test").get();

View File

@ -19,7 +19,7 @@
package org.elasticsearch.search;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@ -128,7 +128,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
assertThat(cancelTasksResponse.getTasks().get(0).getTaskId(), equalTo(searchTask.getTaskId()));
}
private SearchResponse ensureSearchWasCancelled(ListenableActionFuture<SearchResponse> searchResponse) {
private SearchResponse ensureSearchWasCancelled(ActionFuture<SearchResponse> searchResponse) {
try {
SearchResponse response = searchResponse.actionGet();
logger.info("Search response {}", response);
@ -146,7 +146,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test").setQuery(
scriptQuery(new Script(
ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap())))
.execute();
@ -164,7 +164,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.addScriptField("test_field",
new Script(ScriptType.INLINE, "native", NativeTestScriptedBlockFactory.TEST_NATIVE_BLOCK_SCRIPT, Collections.emptyMap())
).execute();
@ -182,7 +182,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
indexTestData();
logger.info("Executing search");
ListenableActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
ActionFuture<SearchResponse> searchResponse = client().prepareSearch("test")
.setScroll(TimeValue.timeValueSeconds(10))
.setSize(5)
.setQuery(
@ -230,7 +230,7 @@ public class SearchCancellationIT extends ESIntegTestCase {
String scrollId = searchResponse.getScrollId();
logger.info("Executing scroll with id {}", scrollId);
ListenableActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId())
ActionFuture<SearchResponse> scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId())
.setScroll(keepAlive).execute();
awaitForBlock(plugins);

View File

@ -21,7 +21,7 @@ package org.elasticsearch.snapshots;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
@ -412,7 +412,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> execution was blocked on node [{}], aborting snapshot", blockedNode);
ListenableActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute();
ActionFuture<DeleteSnapshotResponse> deleteSnapshotResponseFuture = internalCluster().client(nodes.get(0)).admin().cluster().prepareDeleteSnapshot("test-repo", "test-snap").execute();
// Make sure that abort makes some progress
Thread.sleep(100);
unblockNode("test-repo", blockedNode);

View File

@ -19,7 +19,7 @@
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -83,7 +83,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of first snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
ActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
@ -129,8 +129,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot1).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));
@ -185,8 +184,7 @@ public class MinThreadsSnapshotRestoreIT extends AbstractSnapshotIntegTestCase {
String blockedNode = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, blockedNode).repository(repo)).blockOnDataFiles(true);
logger.info("--> start deletion of snapshot");
ListenableActionFuture<DeleteSnapshotResponse> future =
client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
ActionFuture<DeleteSnapshotResponse> future = client().admin().cluster().prepareDeleteSnapshot(repo, snapshot2).execute();
logger.info("--> waiting for block to kick in on node [{}]", blockedNode);
waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10));

View File

@ -22,7 +22,7 @@ package org.elasticsearch.snapshots;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
@ -56,7 +56,6 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
@ -155,7 +154,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertHitCount(client.prepareSearch("test-idx-2").setSize(0).get(), 100L);
assertHitCount(client.prepareSearch("test-idx-3").setSize(0).get(), 100L);
ListenableActionFuture<FlushResponse> flushResponseFuture = null;
ActionFuture<FlushResponse> flushResponseFuture = null;
if (randomBoolean()) {
ArrayList<String> indicesToFlush = new ArrayList<>();
for (int i = 1; i < 4; i++) {
@ -888,7 +887,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> delete index");
cluster().wipeIndices("test-idx");
logger.info("--> restore index after deletion");
ListenableActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture =
ActionFuture<RestoreSnapshotResponse> restoreSnapshotResponseFuture =
client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap").setWaitForCompletion(true).execute();
logger.info("--> wait for the index to appear");
@ -2014,7 +2013,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits(), equalTo(100L));
logger.info("--> snapshot allow partial {}", allowPartial);
ListenableActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute();
logger.info("--> wait for block to kick in");
if (initBlocking) {
@ -2109,7 +2108,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
blockAllDataNodes("test-repo");
logger.info("--> execution will be blocked on all data nodes");
final ListenableActionFuture<RestoreSnapshotResponse> restoreFut;
final ActionFuture<RestoreSnapshotResponse> restoreFut;
try {
logger.info("--> start restore");
restoreFut = client.admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap")
@ -2174,7 +2173,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> execution will be blocked on all data nodes");
blockAllDataNodes(repoName);
final ListenableActionFuture<RestoreSnapshotResponse> restoreFut;
final ActionFuture<RestoreSnapshotResponse> restoreFut;
try {
logger.info("--> start restore");
restoreFut = client.admin().cluster().prepareRestoreSnapshot(repoName, snapshotName)
@ -2461,7 +2460,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
// take initial snapshot with a block, making sure we only get 1 in-progress snapshot returned
// block a node so the create snapshot operation can remain in progress
final String initialBlockedNode = blockNodeWithIndex(repositoryName, indexName);
ListenableActionFuture<CreateSnapshotResponse> responseListener =
ActionFuture<CreateSnapshotResponse> responseListener =
client.admin().cluster().prepareCreateSnapshot(repositoryName, "snap-on-empty-repo")
.setWaitForCompletion(false)
.setIndices(indexName)

View File

@ -8,14 +8,22 @@ an object source did not require the XContentType to be specified. The auto-dete
type is no longer used, so these methods now require the XContentType as an additional argument when
providing the source in bytes or as a string.
=== `DeleteByQueryRequest` requires an explicitly set query
==== `DeleteByQueryRequest` requires an explicitly set query
In previous versions of Elasticsearch, delete by query requests without an explicit query
were accepted, match_all was used as the default query and all documents were deleted
as a result. From version 6.0.0, a `DeleteByQueryRequest` requires an explicit query be set.
=== `InternalStats` and `Stats` getCountAsString() method removed
==== `InternalStats` and `Stats` getCountAsString() method removed
The `count` value in the stats aggregation represents a doc count that shouldnn't require a formatted
version. This method was deprecated in 5.4 in favour of just using
`String.valueOf(getCount())` if needed
==== `ActionRequestBuilder#execute` returns `ActionFuture` rather than `ListenableActionFuture`
When sending a request through the request builders e.g. client.prepareSearch().execute(), it used to
be possible to call `addListener` against the returned `ListenableActionFuture`. With this change an
`ActionFuture` is returned instead, which is consistent with what the `Client` methods return, hence
it is not possible to associate the future with listeners. The `execute` method that accept a listener
as an argument can be used instead.

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
@ -111,7 +111,7 @@ public class CancelTests extends ReindexTestCase {
ALLOWED_OPERATIONS.release(numModifiedDocs - builder.request().getSlices());
// Now execute the reindex action...
ListenableActionFuture<? extends BulkByScrollResponse> future = builder.execute();
ActionFuture<? extends BulkByScrollResponse> future = builder.execute();
/* ... and waits for the indexing operation listeners to block. It is important to realize that some of the workers might have
* exhausted their slice while others might have quite a bit left to work on. We can't control that. */

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder;
@ -31,6 +31,7 @@ import org.elasticsearch.tasks.TaskId;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.allOf;
@ -87,7 +88,7 @@ public class RethrottleTests extends ReindexTestCase {
// Start a request that will never finish unless we rethrottle it
request.setRequestsPerSecond(.000001f); // Throttle "forever"
request.source().setSize(1); // Make sure we use multiple batches
ListenableActionFuture<? extends BulkByScrollResponse> responseListener = request.execute();
ActionFuture<? extends BulkByScrollResponse> responseListener = request.execute();
TaskGroup taskGroupToRethrottle = findTaskToRethrottle(actionName, request.request().getSlices());
TaskId taskToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();
@ -102,7 +103,7 @@ public class RethrottleTests extends ReindexTestCase {
assertBusy(() -> {
BulkByScrollTask.Status parent = (BulkByScrollTask.Status) client().admin().cluster().prepareGetTask(taskToRethrottle).get()
.getTask().getTask().getStatus();
long finishedSubTasks = parent.getSliceStatuses().stream().filter(s -> s != null).count();
long finishedSubTasks = parent.getSliceStatuses().stream().filter(Objects::nonNull).count();
ListTasksResponse list = client().admin().cluster().prepareListTasks().setParentTaskId(taskToRethrottle).get();
list.rethrowFailures("subtasks");
assertThat(finishedSubTasks + list.getTasks().size(), greaterThanOrEqualTo((long) request.request().getSlices()));
@ -124,7 +125,7 @@ public class RethrottleTests extends ReindexTestCase {
/* Check that at least one slice was rethrottled. We won't always rethrottle all of them because they might have completed.
* With multiple slices these numbers might not add up perfectly, thus the 1.01F. */
long unfinished = status.getSliceStatuses().stream()
.filter(slice -> slice != null)
.filter(Objects::nonNull)
.filter(slice -> slice.getStatus().getTotal() > slice.getStatus().getSuccessfullyProcessed())
.count();
float maxExpectedSliceRequestsPerSecond = newRequestsPerSecond == Float.POSITIVE_INFINITY ?

View File

@ -19,7 +19,7 @@
package org.elasticsearch.index.reindex;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
@ -30,13 +30,11 @@ import org.elasticsearch.action.bulk.byscroll.AbstractBulkByScrollRequestBuilder
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollTask;
import org.elasticsearch.action.bulk.byscroll.BulkIndexByScrollResponseMatcher;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.remote.RemoteInfo;
import org.elasticsearch.plugins.Plugin;
@ -151,7 +149,7 @@ public class RetryTests extends ESSingleNodeTestCase {
request.source().setSize(DOC_COUNT / randomIntBetween(2, 10));
logger.info("Starting request");
ListenableActionFuture<BulkByScrollResponse> responseListener = request.execute();
ActionFuture<BulkByScrollResponse> responseListener = request.execute();
try {
logger.info("Waiting for search rejections on the initial search");