From ecd033bea62489e70c60763cfc117d0b96534fa0 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 21 May 2019 17:20:52 +0200 Subject: [PATCH] Cleanup Various Uses of ActionListener (#40126) (#42274) * Cleanup Various Uses of ActionListener * Use shorter `map`, `runAfter` or `wrap` where functionally equivalent to anonymous class * Use ActionRunnable where functionally equivalent --- .../tasks/get/TransportGetTaskAction.java | 28 +++----- .../TransportSnapshotsStatusAction.java | 22 ++----- .../upgrade/post/TransportUpgradeAction.java | 25 ++----- .../action/bulk/BulkRequestHandler.java | 21 ++---- .../ingest/PutPipelineTransportAction.java | 25 ++----- .../support/ThreadedActionListener.java | 10 +-- .../broadcast/TransportBroadcastAction.java | 42 ++++-------- ...ransportInstanceSingleOperationAction.java | 29 +++----- .../shard/TransportSingleShardAction.java | 9 +-- .../support/tasks/TransportTasksAction.java | 18 +---- .../elasticsearch/search/SearchService.java | 48 ++------------ .../transport/TransportKeepAlive.java | 5 +- .../action/RejectionActionIT.java | 6 +- .../node/tasks/TransportTasksActionTests.java | 12 +--- .../search/ClearScrollControllerTests.java | 66 ++++++------------- .../TransportActionFilterChainTests.java | 11 ++-- .../TransportWriteActionTests.java | 12 +--- .../decider/EnableAssignmentDeciderIT.java | 14 +--- 18 files changed, 96 insertions(+), 307 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java index fe07a4efe93..d1d72da5445 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/get/TransportGetTaskAction.java @@ -203,27 +203,15 @@ public class TransportGetTaskAction extends HandledTransportAction() { - @Override - public void onResponse(GetResponse getResponse) { - try { - onGetFinishedTaskFromIndex(getResponse, listener); - } catch (Exception e) { - listener.onFailure(e); - } + client.get(get, ActionListener.wrap(r -> onGetFinishedTaskFromIndex(r, listener), e -> { + if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) { + // We haven't yet created the index for the task results so it can't be found. + listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e, + request.getTaskId())); + } else { + listener.onFailure(e); } - - @Override - public void onFailure(Exception e) { - if (ExceptionsHelper.unwrap(e, IndexNotFoundException.class) != null) { - // We haven't yet created the index for the task results so it can't be found. - listener.onFailure(new ResourceNotFoundException("task [{}] isn't running and hasn't stored its results", e, - request.getTaskId())); - } else { - listener.onFailure(e); - } - } - }); + })); } /** diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 5dfc24d1e28..c2f0d3dd0c0 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -119,23 +119,11 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction() { - @Override - public void onResponse(TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) { - try { - List currentSnapshots = - snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())); - listener.onResponse(buildResponse(request, currentSnapshots, nodeSnapshotStatuses)); - } catch (Exception e) { - listener.onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + transportNodesSnapshotsStatus.execute(nodesRequest, + ActionListener.map( + listener, nodeSnapshotStatuses -> + buildResponse(request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), + nodeSnapshotStatuses))); } else { // We don't have any in-progress shards, just return current stats listener.onResponse(buildResponse(request, currentSnapshots, null)); diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java index f2d046f3321..b122350c3e6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/upgrade/post/TransportUpgradeAction.java @@ -184,26 +184,13 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction listener) { - ActionListener settingsUpdateListener = new ActionListener() { - @Override - public void onResponse(UpgradeResponse upgradeResponse) { - try { - if (upgradeResponse.versions().isEmpty()) { - listener.onResponse(upgradeResponse); - } else { - updateSettings(upgradeResponse, listener); - } - } catch (Exception e) { - listener.onFailure(e); - } + super.doExecute(task, request, ActionListener.wrap(upgradeResponse -> { + if (upgradeResponse.versions().isEmpty()) { + listener.onResponse(upgradeResponse); + } else { + updateSettings(upgradeResponse, listener); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }; - super.doExecute(task, request, settingsUpdateListener); + }, listener::onFailure)); } private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener listener) { diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java index 2f5db520088..7890fb4e83f 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkRequestHandler.java @@ -59,27 +59,20 @@ public final class BulkRequestHandler { semaphore.acquire(); toRelease = semaphore::release; CountDownLatch latch = new CountDownLatch(1); - retry.withBackoff(consumer, bulkRequest, new ActionListener() { + retry.withBackoff(consumer, bulkRequest, ActionListener.runAfter(new ActionListener() { @Override public void onResponse(BulkResponse response) { - try { - listener.afterBulk(executionId, bulkRequest, response); - } finally { - semaphore.release(); - latch.countDown(); - } + listener.afterBulk(executionId, bulkRequest, response); } @Override public void onFailure(Exception e) { - try { - listener.afterBulk(executionId, bulkRequest, e); - } finally { - semaphore.release(); - latch.countDown(); - } + listener.afterBulk(executionId, bulkRequest, e); } - }); + }, () -> { + semaphore.release(); + latch.countDown(); + })); bulkRequestSetupSuccessful = true; if (concurrentRequests == 0) { latch.await(); diff --git a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 97f13bf71d1..be1528a354b 100644 --- a/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -22,7 +22,6 @@ package org.elasticsearch.action.ingest; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; @@ -74,25 +73,13 @@ public class PutPipelineTransportAction extends TransportMasterNodeAction() { - @Override - public void onResponse(NodesInfoResponse nodeInfos) { - try { - Map ingestInfos = new HashMap<>(); - for (NodeInfo nodeInfo : nodeInfos.getNodes()) { - ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); - } - ingestService.putPipeline(ingestInfos, request, listener); - } catch (Exception e) { - onFailure(e); - } + client.admin().cluster().nodesInfo(nodesInfoRequest, ActionListener.wrap(nodeInfos -> { + Map ingestInfos = new HashMap<>(); + for (NodeInfo nodeInfo : nodeInfos.getNodes()) { + ingestInfos.put(nodeInfo.getNode(), nodeInfo.getIngest()); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + ingestService.putPipeline(ingestInfos, request, listener); + }, listener::onFailure)); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java index dfcf6445abf..9af3e9a3315 100644 --- a/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/support/ThreadedActionListener.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.support; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -86,21 +87,16 @@ public final class ThreadedActionListener implements ActionListener(listener) { @Override public boolean isForceExecution() { return forceExecution; } @Override - protected void doRun() throws Exception { + protected void doRun() { listener.onResponse(response); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } }); } diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java index 87c9e153241..15daaf786b6 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.support.broadcast; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -36,7 +37,6 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; @@ -287,45 +287,25 @@ public abstract class TransportBroadcastAction< @Override public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception { - asyncShardOperation(request, task, new ActionListener() { - @Override - public void onResponse(ShardResponse response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); + asyncShardOperation(request, task, + ActionListener.wrap(channel::sendResponse, e -> { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + } } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn(() -> new ParameterizedMessage( - "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); - } - } - }); + )); } } protected void asyncShardOperation(ShardRequest request, Task task, ActionListener listener) { - transportService.getThreadPool().executor(getExecutor(request)).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable(listener) { @Override protected void doRun() throws Exception { listener.onResponse(shardOperation(request, task)); } }); } - - protected String getExecutor(ShardRequest request) { - return shardExecutor; - } - } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index c575c3b2338..d1d7b6ffac5 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -254,27 +254,16 @@ public abstract class TransportInstanceSingleOperationAction< @Override public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception { - shardOperation(request, new ActionListener() { - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); + shardOperation(request, + ActionListener.wrap(channel::sendResponse, e -> { + try { + channel.sendResponse(e); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.warn("failed to send response for get", inner); + } } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.warn("failed to send response for get", inner); - } - } - }); - + )); } } } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index 3c2e7f9a49e..81763c88a6b 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -22,6 +22,7 @@ package org.elasticsearch.action.support.single.shard; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ChannelActionListener; @@ -40,7 +41,6 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.LoggerMessageFormat; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -107,12 +107,7 @@ public abstract class TransportSingleShardAction listener) throws IOException { - threadPool.executor(getExecutor(request, shardId)).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable(listener) { @Override protected void doRun() throws Exception { listener.onResponse(shardOperation(request, shardId)); diff --git a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java index c2f9872ca5c..8d80a15beb1 100644 --- a/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/tasks/TransportTasksAction.java @@ -329,19 +329,8 @@ public abstract class TransportTasksAction< @Override public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception { - nodeOperation(request, new ActionListener() { - @Override - public void onResponse( - TransportTasksAction.NodeTasksResponse response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { + nodeOperation(request, ActionListener.wrap(channel::sendResponse, + e -> { try { channel.sendResponse(e); } catch (IOException e1) { @@ -349,11 +338,10 @@ public abstract class TransportTasksAction< logger.warn("Failed to send failure", e1); } } - }); + )); } } - private class NodeTaskRequest extends TransportRequest { private TasksRequest tasksRequest; diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0b22f5d6606..bf950ac23df 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -25,6 +25,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TopDocs; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchTask; import org.elasticsearch.action.search.SearchType; @@ -39,7 +40,6 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.core.internal.io.IOUtils; @@ -302,21 +302,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } public void executeDfsPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { - rewriteShardRequest(request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest request) { - try { - listener.onResponse(executeDfsPhase(request, task)); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + rewriteShardRequest(request, ActionListener.map(listener, r -> executeDfsPhase(r, task))); } private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask task) throws IOException { @@ -351,30 +337,11 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } public void executeQueryPhase(ShardSearchRequest request, SearchTask task, ActionListener listener) { - rewriteShardRequest(request, new ActionListener() { - @Override - public void onResponse(ShardSearchRequest request) { - try { - listener.onResponse(executeQueryPhase(request, task)); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); + rewriteShardRequest(request, ActionListener.map(listener, r -> executeQueryPhase(r, task))); } private void runAsync(long id, Supplier executable, ActionListener listener) { - getExecutor(id).execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + getExecutor(id).execute(new ActionRunnable(listener) { @Override protected void doRun() { listener.onResponse(executable.get()); @@ -1058,12 +1025,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv ActionListener actionListener = ActionListener.wrap(r -> // now we need to check if there is a pending refresh and register shard.awaitShardSearchActive(b -> - executor.execute(new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - + executor.execute(new ActionRunnable(listener) { @Override protected void doRun() { listener.onResponse(request); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java index fc7ebe4b964..9e49d06f2b0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportKeepAlive.java @@ -84,10 +84,7 @@ final class TransportKeepAlive implements Closeable { for (TcpChannel channel : nodeChannels) { scheduledPing.addChannel(channel); - - channel.addCloseListener(ActionListener.wrap(() -> { - scheduledPing.removeChannel(channel); - })); + channel.addCloseListener(ActionListener.wrap(() -> scheduledPing.removeChannel(channel))); } } diff --git a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java index ad2447cb7b3..e0ef29bf7f4 100644 --- a/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java +++ b/server/src/test/java/org/elasticsearch/action/RejectionActionIT.java @@ -65,19 +65,17 @@ public class RejectionActionIT extends ESIntegTestCase { client().prepareSearch("test") .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("field", "1")) - .execute(new ActionListener() { + .execute(new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(SearchResponse searchResponse) { responses.add(searchResponse); - latch.countDown(); } @Override public void onFailure(Exception e) { responses.add(e); - latch.countDown(); } - }); + }, latch)); } latch.await(); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java index ec8af36dbf2..190f85d635b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TransportTasksActionTests.java @@ -470,17 +470,7 @@ public class TransportTasksActionTests extends TaskManagerTestCase { connectNodes(testNodes); CountDownLatch checkLatch = new CountDownLatch(1); CountDownLatch responseLatch = new CountDownLatch(1); - Task task = startBlockingTestNodesAction(checkLatch, new ActionListener() { - @Override - public void onResponse(NodesResponse nodeResponses) { - responseLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - responseLatch.countDown(); - } - }); + Task task = startBlockingTestNodesAction(checkLatch, ActionListener.wrap(responseLatch::countDown)); String actionName = "internal:testAction"; // only pick the main action // Try to cancel main task using action name diff --git a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java index 55c39f735ce..bcb4a1200b7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/ClearScrollControllerTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.util.concurrent.AtomicArray; @@ -42,32 +43,24 @@ import java.util.concurrent.atomic.AtomicInteger; public class ClearScrollControllerTests extends ESTestCase { - public void testClearAll() throws IOException, InterruptedException { + public void testClearAll() throws InterruptedException { DiscoveryNode node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(3, clearScrollResponse.getNumFreed()); - assertTrue(clearScrollResponse.isSucceeded()); - } finally { - latch.countDown(); - } + assertEquals(3, clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @Override @@ -112,27 +105,18 @@ public class ClearScrollControllerTests extends ESTestCase { String scrollId = TransportSearchHelper.buildScrollId(array); DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); - assertTrue(clearScrollResponse.isSucceeded()); - } finally { - latch.countDown(); - } - + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + assertTrue(clearScrollResponse.isSucceeded()); } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { @@ -185,32 +169,22 @@ public class ClearScrollControllerTests extends ESTestCase { DiscoveryNodes nodes = DiscoveryNodes.builder().add(node1).add(node2).add(node3).build(); CountDownLatch latch = new CountDownLatch(1); - ActionListener listener = new ActionListener() { + ActionListener listener = new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(ClearScrollResponse clearScrollResponse) { - try { - assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); - if (numFailures.get() > 0) { - assertFalse(clearScrollResponse.isSucceeded()); - } else { - assertTrue(clearScrollResponse.isSucceeded()); - } - - } finally { - latch.countDown(); + assertEquals(numFreed.get(), clearScrollResponse.getNumFreed()); + if (numFailures.get() > 0) { + assertFalse(clearScrollResponse.isSucceeded()); + } else { + assertTrue(clearScrollResponse.isSucceeded()); } - } @Override public void onFailure(Exception e) { - try { - throw new AssertionError(e); - } finally { - latch.countDown(); - } + throw new AssertionError(e); } - }; + }, latch); List nodesInvoked = new CopyOnWriteArrayList<>(); SearchTransportService searchTransportService = new SearchTransportService(null, null) { diff --git a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java index f222bcc015c..96d057f50c4 100644 --- a/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/TransportActionFilterChainTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.node.Node; import org.elasticsearch.tasks.Task; @@ -65,7 +66,7 @@ public class TransportActionFilterChainTests extends ESTestCase { terminate(threadPool); } - public void testActionFiltersRequest() throws ExecutionException, InterruptedException { + public void testActionFiltersRequest() throws InterruptedException { int numFilters = randomInt(10); Set orders = new HashSet<>(numFilters); while (orders.size() < numFilters) { @@ -139,7 +140,7 @@ public class TransportActionFilterChainTests extends ESTestCase { } } - public void testTooManyContinueProcessingRequest() throws ExecutionException, InterruptedException { + public void testTooManyContinueProcessingRequest() throws InterruptedException { final int additionalContinueCount = randomInt(10); RequestTestFilter testFilter = new RequestTestFilter(randomInt(), new RequestCallback() { @@ -169,19 +170,17 @@ public class TransportActionFilterChainTests extends ESTestCase { final AtomicInteger responses = new AtomicInteger(); final List failures = new CopyOnWriteArrayList<>(); - transportAction.execute(new TestRequest(), new ActionListener() { + transportAction.execute(new TestRequest(), new LatchedActionListener<>(new ActionListener() { @Override public void onResponse(TestResponse testResponse) { responses.incrementAndGet(); - latch.countDown(); } @Override public void onFailure(Exception e) { failures.add(e); - latch.countDown(); } - }); + }, latch)); if (!latch.await(10, TimeUnit.SECONDS)) { fail("timeout waiting for the filter to notify the listener as many times as expected"); diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 1a7e5a73e75..57b30d3484b 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -369,17 +369,7 @@ public class TransportWriteActionTests extends ESTestCase { CountDownLatch completionLatch = new CountDownLatch(1); threadPool.generic().execute(() -> { waitForBarrier.run(); - replicaResult.respond(new ActionListener() { - @Override - public void onResponse(TransportResponse.Empty empty) { - completionLatch.countDown(); - } - - @Override - public void onFailure(Exception e) { - completionLatch.countDown(); - } - }); + replicaResult.respond(ActionListener.wrap(completionLatch::countDown)); }); if (randomBoolean()) { threadPool.generic().execute(() -> { diff --git a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java index aeb4d9b3a9b..2ea6567c9f8 100644 --- a/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java +++ b/server/src/test/java/org/elasticsearch/persistent/decider/EnableAssignmentDeciderIT.java @@ -23,9 +23,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.persistent.PersistentTaskParams; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask; import org.elasticsearch.persistent.PersistentTasksService; import org.elasticsearch.persistent.TestPersistentTasksPlugin; import org.elasticsearch.persistent.TestPersistentTasksPlugin.TestParams; @@ -72,17 +70,7 @@ public class EnableAssignmentDeciderIT extends ESIntegTestCase { for (int i = 0; i < numberOfTasks; i++) { PersistentTasksService service = internalCluster().getInstance(PersistentTasksService.class); service.sendStartRequest("task_" + i, TestPersistentTasksExecutor.NAME, new TestParams(randomAlphaOfLength(10)), - new ActionListener>() { - @Override - public void onResponse(PersistentTask task) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - latch.countDown(); - } - }); + ActionListener.wrap(latch::countDown)); } latch.await();