From 547af21a12660ae9d471f87efe7da16c4c0a6666 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 4 Mar 2019 12:56:46 +0100 Subject: [PATCH] Introduce Mapping ActionListener (#39538) (#39636) * Introduce Safer Chaining of Listeners * The motivation here is to make reasoning about chains of `ActionListener` a little easier, by providing a safe method for nesting `ActionListener` that guarantees that a response is never dropped. Also, it dries up the code a little by removing the need to repeat `listener::onFailure` and `listener.onResponse` over and over. * Refactored a number of obvious/easy spots to use the new listener constructor --- .../org/elasticsearch/action/ActionListener.java | 15 +++++++++++++++ .../reroute/TransportClusterRerouteAction.java | 6 +++--- .../create/TransportCreateSnapshotAction.java | 6 ++---- .../delete/TransportDeleteSnapshotAction.java | 2 +- .../create/TransportCreateIndexAction.java | 5 ++--- .../indices/shrink/TransportResizeAction.java | 7 ++----- .../action/bulk/TransportBulkAction.java | 6 ++---- .../action/search/SearchScrollAsyncAction.java | 12 ++---------- .../index/query/TermsQueryBuilder.java | 12 +++++------- .../index/seqno/RetentionLeaseActions.java | 10 +++------- .../recovery/PeerRecoveryTargetService.java | 7 +++---- .../indices/recovery/RecoverySourceHandler.java | 7 +++---- .../recovery/RemoteRecoveryTargetHandler.java | 8 ++++---- .../persistent/PersistentTasksService.java | 6 ++---- .../index/shard/IndexShardTests.java | 10 ++++------ 15 files changed, 53 insertions(+), 66 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ActionListener.java b/server/src/main/java/org/elasticsearch/action/ActionListener.java index 34d90035ad6..158f8aa61fa 100644 --- a/server/src/main/java/org/elasticsearch/action/ActionListener.java +++ b/server/src/main/java/org/elasticsearch/action/ActionListener.java @@ -21,6 +21,7 @@ package org.elasticsearch.action; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.CheckedConsumer; +import org.elasticsearch.common.CheckedFunction; import org.elasticsearch.common.CheckedSupplier; import java.util.ArrayList; @@ -83,6 +84,20 @@ public interface ActionListener { return wrap(r -> runnable.run(), e -> runnable.run()); } + /** + * Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along + * exceptions to the delegate. + * + * @param listener Listener to delegate to + * @param fn Function to apply to listener response + * @param Response type of the new listener + * @param Response type of the wrapped listener + * @return a listener that maps the received response and then passes it to its delegate listener + */ + static ActionListener map(ActionListener listener, CheckedFunction fn) { + return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure); + } + /** * Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture} * api. diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index 7f29f0bb6db..4b51145204a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -148,13 +148,13 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction listener) { clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request, - ActionListener.wrap( + ActionListener.map(listener, response -> { if (request.dryRun() == false) { response.getExplanations().getYesDecisionMessages().forEach(logger::info); } - listener.onResponse(response); - }, listener::onFailure))); + return response; + }))); } static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java index fe4089df945..7fd630e8b2f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/create/TransportCreateSnapshotAction.java @@ -72,11 +72,9 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction listener) { if (request.waitForCompletion()) { - snapshotsService.executeSnapshot(request, - ActionListener.wrap(snapshotInfo-> listener.onResponse(new CreateSnapshotResponse(snapshotInfo)), listener::onFailure)); + snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new)); } else { - snapshotsService.createSnapshot(request, - ActionListener.wrap(snapshot -> listener.onResponse(new CreateSnapshotResponse()), listener::onFailure)); + snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse())); } } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index dfb38aaed21..140d480242d 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -68,6 +68,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction listener) { snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), - ActionListener.wrap(v -> listener.onResponse(new AcknowledgedResponse(true)), listener::onFailure), false); + ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java index ad11ea7e596..03bbd0903a6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -80,9 +80,8 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction - listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)), - listener::onFailure)); + createIndexService.createIndex(updateRequest, ActionListener.map(listener, response -> + new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName))); } } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java index a4b9fa8e66d..67c955b4c67 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shrink/TransportResizeAction.java @@ -107,11 +107,8 @@ public class TransportResizeAction extends TransportMasterNodeAction - listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), - updateRequest.index())), listener::onFailure - ) + updateRequest, ActionListener.map(listener, + response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index())) ); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 567b7fb8080..21bb539676d 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -635,10 +635,8 @@ public class TransportBulkAction extends HandledTransportAction wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener actionListener) { if (itemResponses.isEmpty()) { - return ActionListener.wrap( - response -> actionListener.onResponse(new BulkResponse(response.getItems(), - response.getTook().getMillis(), ingestTookInMillis)), - actionListener::onFailure); + return ActionListener.map(actionListener, + response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis)); } else { return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java index 81d7e66d19e..46de6de8f87 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAsyncAction.java @@ -129,16 +129,8 @@ abstract class SearchScrollAsyncAction implements R listener.onResponse((cluster, node) -> nodes.get(node)); } else { RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService(); - remoteClusterService.collectNodes(clusters, ActionListener.wrap(nodeFunction -> { - final BiFunction clusterNodeLookup = (clusterAlias, node) -> { - if (clusterAlias == null) { - return nodes.get(node); - } else { - return nodeFunction.apply(clusterAlias, node); - } - }; - listener.onResponse(clusterNodeLookup); - }, listener::onFailure)); + remoteClusterService.collectNodes(clusters, ActionListener.map(listener, + nodeFunction -> (clusterAlias, node) -> clusterAlias == null ? nodes.get(node) : nodeFunction.apply(clusterAlias, node))); } } diff --git a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java index ae7bbae6301..6fdedb8d6a9 100644 --- a/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java +++ b/server/src/main/java/org/elasticsearch/index/query/TermsQueryBuilder.java @@ -501,13 +501,11 @@ public class TermsQueryBuilder extends AbstractQueryBuilder { return supplier.get() == null ? this : new TermsQueryBuilder(this.fieldName, supplier.get()); } else if (this.termsLookup != null) { SetOnce> supplier = new SetOnce<>(); - queryRewriteContext.registerAsyncAction((client, listener) -> { - fetch(termsLookup, client, ActionListener.wrap(list -> { - supplier.set(list); - listener.onResponse(null); - }, listener::onFailure)); - - }); + queryRewriteContext.registerAsyncAction((client, listener) -> + fetch(termsLookup, client, ActionListener.map(listener, list -> { + supplier.set(list); + return null; + }))); return new TermsQueryBuilder(this.fieldName, supplier::get); } return this; diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java index 6fa1fd7fb3f..3493271e8d7 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseActions.java @@ -169,9 +169,7 @@ public class RetentionLeaseActions { request.getId(), request.getRetainingSequenceNumber(), request.getSource(), - ActionListener.wrap( - r -> listener.onResponse(new Response()), - listener::onFailure)); + ActionListener.map(listener, r -> new Response())); } } @@ -264,9 +262,7 @@ public class RetentionLeaseActions { void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request, final ActionListener listener) { indexShard.removeRetentionLease( request.getId(), - ActionListener.wrap( - r -> listener.onResponse(new Response()), - listener::onFailure)); + ActionListener.map(listener, r -> new Response())); } } @@ -401,4 +397,4 @@ public class RetentionLeaseActions { } -} \ No newline at end of file +} diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 068b92991db..fc8e0c765b5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -435,7 +435,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), - ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); + ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } } @@ -447,7 +447,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ActionListener listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request); recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), - ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); + ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } } @@ -630,8 +630,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { } final ActionListener listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request); recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(), - request.totalTranslogOps(), - ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); + request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE)); } } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6bca848a361..7cf4d28d428 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -552,7 +552,7 @@ public class RecoverySourceHandler { }; final StopWatch stopWatch = new StopWatch().start(); - final ActionListener batchedListener = ActionListener.wrap( + final ActionListener batchedListener = ActionListener.map(listener, targetLocalCheckpoint -> { assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get() : String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]", @@ -560,9 +560,8 @@ public class RecoverySourceHandler { stopWatch.stop(); final TimeValue tookTime = stopWatch.totalTime(); logger.trace("recovery [phase2]: took [{}]", tookTime); - listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime)); - }, - listener::onFailure + return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime); + } ); sendBatch( diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java index 436d59dba85..fcedf0a000a 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -82,7 +82,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @@ -91,7 +91,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint), TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), - new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC)); } @@ -130,7 +130,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { maxSeqNoOfDeletesOrUpdatesOnPrimary, retentionLeases); transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions, - new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure), + new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint), RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC)); } @@ -187,7 +187,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler { * would be in to restart file copy again (new deltas) if we have too many translog ops are piling up. */ throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>( - ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), in -> TransportResponse.Empty.INSTANCE)); + ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE)); } } diff --git a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java index ef467200001..9664be255da 100644 --- a/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java +++ b/server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java @@ -68,8 +68,7 @@ public class PersistentTasksService { final Params taskParams, final ActionListener> listener) { @SuppressWarnings("unchecked") - final ActionListener> wrappedListener = - ActionListener.wrap(t -> listener.onResponse((PersistentTask) t), listener::onFailure); + final ActionListener> wrappedListener = ActionListener.map(listener, t -> (PersistentTask) t); StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams); execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener); } @@ -132,8 +131,7 @@ public class PersistentTasksService { private void execute(final Req request, final Action action, final ActionListener> listener) { try { - client.execute(action, request, - ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure)); + client.execute(action, request, ActionListener.map(listener, PersistentTaskResponse::getTask)); } catch (Exception e) { listener.onFailure(e); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4b2fc5205fa..e02fdfce6f7 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2622,12 +2622,10 @@ public class IndexShardTests extends IndexShardTestCase { maxAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, retentionLeases, - ActionListener.wrap( - checkpoint -> { - assertListenerCalled.accept(replica); - listener.onResponse(checkpoint); - }, - listener::onFailure)); + ActionListener.map(listener, checkpoint -> { + assertListenerCalled.accept(replica); + return checkpoint; + })); } @Override