Introduce Mapping ActionListener (#39538) (#39636)

* Introduce Safer Chaining of Listeners

* The motivation here is to make reasoning about chains of `ActionListener` a little easier, by providing a safe method for nesting `ActionListener` that guarantees that a response is never dropped. Also, it dries up the code a little by removing the need to repeat `listener::onFailure` and `listener.onResponse` over and over.
* Refactored a number of obvious/easy spots to use the new listener constructor
This commit is contained in:
Armin Braun 2019-03-04 12:56:46 +01:00 committed by GitHub
parent c7a2910cc1
commit 547af21a12
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 53 additions and 66 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedSupplier;
import java.util.ArrayList;
@ -83,6 +84,20 @@ public interface ActionListener<Response> {
return wrap(r -> runnable.run(), e -> runnable.run());
}
/**
* Creates a listener that wraps another listener, mapping response values via the given mapping function and passing along
* exceptions to the delegate.
*
* @param listener Listener to delegate to
* @param fn Function to apply to listener response
* @param <Response> Response type of the new listener
* @param <T> Response type of the wrapped listener
* @return a listener that maps the received response and then passes it to its delegate listener
*/
static <T, Response> ActionListener<Response> map(ActionListener<T> listener, CheckedFunction<Response, T, Exception> fn) {
return wrap(r -> listener.onResponse(fn.apply(r)), listener::onFailure);
}
/**
* Converts a listener to a {@link BiConsumer} for compatibility with the {@link java.util.concurrent.CompletableFuture}
* api.

View File

@ -148,13 +148,13 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
clusterService.submitStateUpdateTask("cluster_reroute (api)",
new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
ActionListener.wrap(
ActionListener.map(listener,
response -> {
if (request.dryRun() == false) {
response.getExplanations().getYesDecisionMessages().forEach(logger::info);
}
listener.onResponse(response);
}, listener::onFailure)));
return response;
})));
}
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {

View File

@ -72,11 +72,9 @@ public class TransportCreateSnapshotAction extends TransportMasterNodeAction<Cre
protected void masterOperation(final CreateSnapshotRequest request, ClusterState state,
final ActionListener<CreateSnapshotResponse> listener) {
if (request.waitForCompletion()) {
snapshotsService.executeSnapshot(request,
ActionListener.wrap(snapshotInfo-> listener.onResponse(new CreateSnapshotResponse(snapshotInfo)), listener::onFailure));
snapshotsService.executeSnapshot(request, ActionListener.map(listener, CreateSnapshotResponse::new));
} else {
snapshotsService.createSnapshot(request,
ActionListener.wrap(snapshot -> listener.onResponse(new CreateSnapshotResponse()), listener::onFailure));
snapshotsService.createSnapshot(request, ActionListener.map(listener, snapshot -> new CreateSnapshotResponse()));
}
}
}

View File

@ -68,6 +68,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(),
ActionListener.wrap(v -> listener.onResponse(new AcknowledgedResponse(true)), listener::onFailure), false);
ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false);
}
}

View File

@ -80,9 +80,8 @@ public class TransportCreateIndexAction extends TransportMasterNodeAction<Create
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());
createIndexService.createIndex(updateRequest, ActionListener.wrap(response ->
listener.onResponse(new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)),
listener::onFailure));
createIndexService.createIndex(updateRequest, ActionListener.map(listener, response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}
}

View File

@ -107,11 +107,8 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest,
ActionListener.wrap(response ->
listener.onResponse(new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(),
updateRequest.index())), listener::onFailure
)
updateRequest, ActionListener.map(listener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}

View File

@ -635,10 +635,8 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
ActionListener<BulkResponse> wrapActionListenerIfNeeded(long ingestTookInMillis, ActionListener<BulkResponse> actionListener) {
if (itemResponses.isEmpty()) {
return ActionListener.wrap(
response -> actionListener.onResponse(new BulkResponse(response.getItems(),
response.getTook().getMillis(), ingestTookInMillis)),
actionListener::onFailure);
return ActionListener.map(actionListener,
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
}

View File

@ -129,16 +129,8 @@ abstract class SearchScrollAsyncAction<T extends SearchPhaseResult> implements R
listener.onResponse((cluster, node) -> nodes.get(node));
} else {
RemoteClusterService remoteClusterService = searchTransportService.getRemoteClusterService();
remoteClusterService.collectNodes(clusters, ActionListener.wrap(nodeFunction -> {
final BiFunction<String, String, DiscoveryNode> clusterNodeLookup = (clusterAlias, node) -> {
if (clusterAlias == null) {
return nodes.get(node);
} else {
return nodeFunction.apply(clusterAlias, node);
}
};
listener.onResponse(clusterNodeLookup);
}, listener::onFailure));
remoteClusterService.collectNodes(clusters, ActionListener.map(listener,
nodeFunction -> (clusterAlias, node) -> clusterAlias == null ? nodes.get(node) : nodeFunction.apply(clusterAlias, node)));
}
}

View File

@ -501,13 +501,11 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
return supplier.get() == null ? this : new TermsQueryBuilder(this.fieldName, supplier.get());
} else if (this.termsLookup != null) {
SetOnce<List<?>> supplier = new SetOnce<>();
queryRewriteContext.registerAsyncAction((client, listener) -> {
fetch(termsLookup, client, ActionListener.wrap(list -> {
queryRewriteContext.registerAsyncAction((client, listener) ->
fetch(termsLookup, client, ActionListener.map(listener, list -> {
supplier.set(list);
listener.onResponse(null);
}, listener::onFailure));
});
return null;
})));
return new TermsQueryBuilder(this.fieldName, supplier::get);
}
return this;

View File

@ -169,9 +169,7 @@ public class RetentionLeaseActions {
request.getId(),
request.getRetainingSequenceNumber(),
request.getSource(),
ActionListener.wrap(
r -> listener.onResponse(new Response()),
listener::onFailure));
ActionListener.map(listener, r -> new Response()));
}
}
@ -264,9 +262,7 @@ public class RetentionLeaseActions {
void doRetentionLeaseAction(final IndexShard indexShard, final RemoveRequest request, final ActionListener<Response> listener) {
indexShard.removeRetentionLease(
request.getId(),
ActionListener.wrap(
r -> listener.onResponse(new Response()),
listener::onFailure));
ActionListener.map(listener, r -> new Response()));
}
}

View File

@ -435,7 +435,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request);
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}
@ -447,7 +447,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request);
recoveryRef.target().finalizeRecovery(request.globalCheckpoint(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}
@ -630,8 +630,7 @@ public class PeerRecoveryTargetService implements IndexEventListener {
}
final ActionListener<TransportResponse> listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request);
recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(),
request.totalTranslogOps(),
ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));
request.totalTranslogOps(), ActionListener.map(listener, nullVal -> TransportResponse.Empty.INSTANCE));
}
}
}

View File

@ -552,7 +552,7 @@ public class RecoverySourceHandler {
};
final StopWatch stopWatch = new StopWatch().start();
final ActionListener<Long> batchedListener = ActionListener.wrap(
final ActionListener<Long> batchedListener = ActionListener.map(listener,
targetLocalCheckpoint -> {
assert snapshot.totalOperations() == snapshot.skippedOperations() + skippedOps.get() + totalSentOps.get()
: String.format(Locale.ROOT, "expected total [%d], overridden [%d], skipped [%d], total sent [%d]",
@ -560,9 +560,8 @@ public class RecoverySourceHandler {
stopWatch.stop();
final TimeValue tookTime = stopWatch.totalTime();
logger.trace("recovery [phase2]: took [{}]", tookTime);
listener.onResponse(new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime));
},
listener::onFailure
return new SendSnapshotResult(targetLocalCheckpoint, totalSentOps.get(), tookTime);
}
);
sendBatch(

View File

@ -82,7 +82,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, fileBasedRecovery),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@ -91,7 +91,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.FINALIZE,
new RecoveryFinalizeRecoveryRequest(recoveryId, shardId, globalCheckpoint),
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> null),
in -> TransportResponse.Empty.INSTANCE, ThreadPool.Names.GENERIC));
}
@ -130,7 +130,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
maxSeqNoOfDeletesOrUpdatesOnPrimary,
retentionLeases);
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure),
new ActionListenerResponseHandler<>(ActionListener.map(listener, r -> r.localCheckpoint),
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));
}
@ -187,7 +187,7 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), fileChunkRequestOptions, new ActionListenerResponseHandler<>(
ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure), in -> TransportResponse.Empty.INSTANCE));
ActionListener.map(listener, r -> null), in -> TransportResponse.Empty.INSTANCE));
}
}

View File

@ -68,8 +68,7 @@ public class PersistentTasksService {
final Params taskParams,
final ActionListener<PersistentTask<Params>> listener) {
@SuppressWarnings("unchecked")
final ActionListener<PersistentTask<?>> wrappedListener =
ActionListener.wrap(t -> listener.onResponse((PersistentTask<Params>) t), listener::onFailure);
final ActionListener<PersistentTask<?>> wrappedListener = ActionListener.map(listener, t -> (PersistentTask<Params>) t);
StartPersistentTaskAction.Request request = new StartPersistentTaskAction.Request(taskId, taskName, taskParams);
execute(request, StartPersistentTaskAction.INSTANCE, wrappedListener);
}
@ -132,8 +131,7 @@ public class PersistentTasksService {
private <Req extends ActionRequest, Resp extends PersistentTaskResponse>
void execute(final Req request, final Action<Resp> action, final ActionListener<PersistentTask<?>> listener) {
try {
client.execute(action, request,
ActionListener.wrap(r -> listener.onResponse(r.getTask()), listener::onFailure));
client.execute(action, request, ActionListener.map(listener, PersistentTaskResponse::getTask));
} catch (Exception e) {
listener.onFailure(e);
}

View File

@ -2622,12 +2622,10 @@ public class IndexShardTests extends IndexShardTestCase {
maxAutoIdTimestamp,
maxSeqNoOfUpdatesOrDeletes,
retentionLeases,
ActionListener.wrap(
checkpoint -> {
ActionListener.map(listener, checkpoint -> {
assertListenerCalled.accept(replica);
listener.onResponse(checkpoint);
},
listener::onFailure));
return checkpoint;
}));
}
@Override