Introduce Delegating ActionListener Wrappers (#40129) (#41527)

* Introduce Delegating ActionListener Wrappers
* Dry up use cases of ActionListener that simply pass through the response or exception to another listener
This commit is contained in:
Armin Braun 2019-04-25 16:05:04 +02:00 committed by GitHub
parent d119abdf96
commit 40aef2b8aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 206 additions and 422 deletions

View File

@ -72,6 +72,53 @@ public interface ActionListener<Response> {
};
}
/**
* Creates a listener that delegates all responses it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and exception
* @param <T> Type of the listener
* @return Delegating listener
*/
static <T> ActionListener<T> delegateResponse(ActionListener<T> delegate, BiConsumer<ActionListener<T>, Exception> bc) {
return new ActionListener<T>() {
@Override
public void onResponse(T r) {
delegate.onResponse(r);
}
@Override
public void onFailure(Exception e) {
bc.accept(delegate, e);
}
};
}
/**
* Creates a listener that delegates all exceptions it receives to another listener.
*
* @param delegate ActionListener to wrap and delegate any exception to
* @param bc BiConsumer invoked with delegate listener and response
* @param <T> Type of the delegating listener's response
* @param <R> Type of the wrapped listeners
* @return Delegating listener
*/
static <T, R> ActionListener<T> delegateFailure(ActionListener<R> delegate, BiConsumer<ActionListener<R>, T> bc) {
return new ActionListener<T>() {
@Override
public void onResponse(T r) {
bc.accept(delegate, r);
}
@Override
public void onFailure(Exception e) {
delegate.onFailure(e);
}
};
}
/**
* Creates a listener that listens for a response (or failure) and executes the
* corresponding runnable when the response (or failure) is received.

View File

@ -157,7 +157,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
// Shift to the generic thread pool and let it wait for the task to complete so we don't block any important threads.
threadPool.generic().execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
protected void doRun() {
taskManager.waitForTaskCompletion(runningTask, waitForCompletionTimeout(request.getTimeout()));
waitedForCompletion(thisTask, request, runningTask.taskInfo(clusterService.localNode().getId(), true), listener);
}
@ -180,26 +180,17 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
*/
void waitedForCompletion(Task thisTask, GetTaskRequest request, TaskInfo snapshotOfRunningTask,
ActionListener<GetTaskResponse> listener) {
getFinishedTaskFromIndex(thisTask, request, new ActionListener<GetTaskResponse>() {
@Override
public void onResponse(GetTaskResponse response) {
// We were able to load the task from the task index. Let's send that back.
listener.onResponse(response);
}
@Override
public void onFailure(Exception e) {
getFinishedTaskFromIndex(thisTask, request, ActionListener.delegateResponse(listener, (delegatedListener, e) -> {
/*
* We couldn't load the task from the task index. Instead of 404 we should use the snapshot we took after it finished. If
* the error isn't a 404 then we'll just throw it back to the user.
*/
if (ExceptionsHelper.unwrap(e, ResourceNotFoundException.class) != null) {
listener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
delegatedListener.onResponse(new GetTaskResponse(new TaskResult(true, snapshotOfRunningTask)));
} else {
listener.onFailure(e);
delegatedListener.onFailure(e);
}
}
});
}));
}
/**

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -69,17 +68,8 @@ public class TransportDeleteRepositoryAction extends TransportMasterNodeAction<D
protected void masterOperation(final DeleteRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.unregisterRepository(
request,
new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse unregisterRepositoryResponse) {
listener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
request, ActionListener.delegateFailure(listener,
(delegatedListener, unregisterRepositoryResponse) ->
delegatedListener.onResponse(new AcknowledgedResponse(unregisterRepositoryResponse.isAcknowledged()))));
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -68,17 +67,7 @@ public class TransportPutRepositoryAction extends TransportMasterNodeAction<PutR
@Override
protected void masterOperation(final PutRepositoryRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
repositoriesService.registerRepository(request, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.registerRepository(request, ActionListener.delegateFailure(listener,
(delegatedListener, response) -> delegatedListener.onResponse(new AcknowledgedResponse(response.isAcknowledged()))));
}
}

View File

@ -33,8 +33,6 @@ import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.util.List;
/**
* Transport action for verifying repository operation
*/
@ -70,16 +68,8 @@ public class TransportVerifyRepositoryAction extends TransportMasterNodeAction<V
@Override
protected void masterOperation(final VerifyRepositoryRequest request, ClusterState state,
final ActionListener<VerifyRepositoryResponse> listener) {
repositoriesService.verifyRepository(request.name(), new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
repositoriesService.verifyRepository(request.name(), ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) ->
delegatedListener.onResponse(new VerifyRepositoryResponse(verifyResponse.toArray(new DiscoveryNode[0])))));
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.RestoreService.RestoreCompletionResponse;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -73,20 +72,13 @@ public class TransportRestoreSnapshotAction extends TransportMasterNodeAction<Re
@Override
protected void masterOperation(final RestoreSnapshotRequest request, final ClusterState state,
final ActionListener<RestoreSnapshotResponse> listener) {
restoreService.restoreSnapshot(request, new ActionListener<RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreCompletionResponse restoreCompletionResponse) {
restoreService.restoreSnapshot(request, ActionListener.delegateFailure(listener,
(delegatedListener, restoreCompletionResponse) -> {
if (restoreCompletionResponse.getRestoreInfo() == null && request.waitForCompletion()) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, listener);
RestoreClusterStateListener.createAndRegisterListener(clusterService, restoreCompletionResponse, delegatedListener);
} else {
listener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
delegatedListener.onResponse(new RestoreSnapshotResponse(restoreCompletionResponse.getRestoreInfo()));
}
}
@Override
public void onFailure(Exception t) {
listener.onFailure(t);
}
});
}));
}
}

View File

@ -118,19 +118,9 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
.masterNodeTimeout(request.masterNodeTimeout())
.waitForActiveShards(request.waitForActiveShards())
.indices(concreteIndices);
indexStateService.closeIndices(closeRequest, new ActionListener<CloseIndexResponse>() {
@Override
public void onResponse(final CloseIndexResponse response) {
listener.onResponse(response);
}
@Override
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
});
indexStateService.closeIndices(closeRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
delegatedListener.onFailure(t);
}));
}
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
@ -98,25 +97,18 @@ public class TransportResizeAction extends TransportMasterNodeAction<ResizeReque
// there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code
final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex());
final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index());
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
client.admin().indices().prepareStats(sourceIndex).clear().setDocs(true).execute(
ActionListener.delegateFailure(listener, (delegatedListener, indicesStatsResponse) -> {
CreateIndexClusterStateUpdateRequest updateRequest = prepareCreateIndexRequest(resizeRequest, state,
(i) -> {
i -> {
IndexShardStats shard = indicesStatsResponse.getIndex(sourceIndex).getIndexShards().get(i);
return shard == null ? null : shard.getPrimary().getDocs();
}, sourceIndex, targetIndex);
createIndexService.createIndex(
updateRequest, ActionListener.map(listener,
updateRequest, ActionListener.map(delegatedListener,
response -> new ResizeResponse(response.isAcknowledged(), response.isShardsAcknowledged(), updateRequest.index()))
);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}));
}

View File

@ -25,7 +25,6 @@ import org.elasticsearch.action.PrimaryMissingActionException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
@ -104,7 +103,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
versions.put(index, new Tuple<>(version, luceneVersion));
}
}
Map<String, Tuple<org.elasticsearch.Version, String>> updatedVersions = new HashMap<>();
Map<String, Tuple<Version, String>> updatedVersions = new HashMap<>();
MetaData metaData = clusterState.metaData();
for (Map.Entry<String, Tuple<Version, org.apache.lucene.util.Version>> versionEntry : versions.entrySet()) {
String index = versionEntry.getKey();
@ -209,16 +208,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
UpgradeSettingsRequest upgradeSettingsRequest = new UpgradeSettingsRequest(upgradeResponse.versions());
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse updateSettingsResponse) {
listener.onResponse(upgradeResponse);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
client.executeLocally(UpgradeSettingsAction.INSTANCE, upgradeSettingsRequest, ActionListener.delegateFailure(
listener, (delegatedListener, updateSettingsResponse) -> delegatedListener.onResponse(upgradeResponse)));
}
}

View File

@ -659,7 +659,15 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
return ActionListener.map(actionListener,
response -> new BulkResponse(response.getItems(), response.getTook().getMillis(), ingestTookInMillis));
} else {
return new IngestBulkResponseListener(ingestTookInMillis, originalSlots, itemResponses, actionListener);
return ActionListener.delegateFailure(actionListener, (delegatedListener, response) -> {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
delegatedListener.onResponse(
new BulkResponse(
itemResponses.toArray(new BulkItemResponse[0]), response.getTook().getMillis(), ingestTookInMillis));
});
}
}
@ -689,36 +697,4 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
}
}
static final class IngestBulkResponseListener implements ActionListener<BulkResponse> {
private final long ingestTookInMillis;
private final int[] originalSlots;
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
this.originalSlots = originalSlots;
}
@Override
public void onResponse(BulkResponse response) {
BulkItemResponse[] items = response.getItems();
for (int i = 0; i < items.length; i++) {
itemResponses.add(originalSlots[i], response.getItems()[i]);
}
actionListener.onResponse(new BulkResponse(
itemResponses.toArray(new BulkItemResponse[itemResponses.size()]),
response.getTook().getMillis(), ingestTookInMillis));
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
}
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.MasterNodeChangePredicate;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -39,7 +40,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
@ -185,23 +185,15 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
});
}
} else {
ActionListener<Response> delegate = new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
listener.onResponse(response);
ActionListener<Response> delegate = ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
"stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
delegatedListener.onFailure(t);
}
@Override
public void onFailure(Exception t) {
if (t instanceof FailedToCommitClusterStateException || t instanceof NotMasterException) {
logger.debug(() -> new ParameterizedMessage("master could not publish cluster state or " +
"stepped down before publishing action [{}], scheduling a retry", actionName), t);
retry(t, masterChangePredicate);
} else {
listener.onFailure(t);
}
}
};
});
threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) {
@Override
protected void doRun() throws Exception {

View File

@ -27,7 +27,6 @@ import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.Strings;
@ -464,22 +463,14 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
? new GetRequest(termsLookup.index(), termsLookup.id())
: new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id());
getRequest.preference("_local").routing(termsLookup.routing());
client.get(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
List<Object> terms = new ArrayList<>();
if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
terms.addAll(extractedValues);
}
actionListener.onResponse(terms);
client.get(getRequest, ActionListener.delegateFailure(actionListener, (delegatedListener, getResponse) -> {
List<Object> terms = new ArrayList<>();
if (getResponse.isSourceEmpty() == false) { // extract terms only if the doc source exists
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());
terms.addAll(extractedValues);
}
@Override
public void onFailure(Exception e) {
actionListener.onFailure(e);
}
});
delegatedListener.onResponse(terms);
}));
}
@Override

View File

@ -102,23 +102,13 @@ public class RetentionLeaseActions {
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.acquirePrimaryOperationPermit(
new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
try (Releasable ignore = releasable) {
doRetentionLeaseAction(indexShard, request, listener);
}
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.SAME,
request);
ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> {
try (Releasable ignore = releasable) {
doRetentionLeaseAction(indexShard, request, delegatedListener);
}
}),
ThreadPool.Names.SAME,
request);
}
@Override

View File

@ -2675,9 +2675,8 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
// primary term update. Since indexShardOperationPermits doesn't guarantee that async submissions are executed
// in the order submitted, combining both operations ensure that the term is updated before the operation is
// executed. It also has the side effect of acquiring all the permits one time instead of two.
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
@Override
public void onResponse(final Releasable releasable) {
final ActionListener<Releasable> operationListener = ActionListener.delegateFailure(onPermitAcquired,
(delegatedListener, releasable) -> {
if (opPrimaryTerm < getOperationPrimaryTerm()) {
releasable.close();
final String message = String.format(
@ -2686,7 +2685,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
shardId,
opPrimaryTerm,
getOperationPrimaryTerm());
onPermitAcquired.onFailure(new IllegalStateException(message));
delegatedListener.onFailure(new IllegalStateException(message));
} else {
assert assertReplicationTarget();
try {
@ -2694,18 +2693,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
} catch (Exception e) {
releasable.close();
onPermitAcquired.onFailure(e);
delegatedListener.onFailure(e);
return;
}
onPermitAcquired.onResponse(releasable);
delegatedListener.onResponse(releasable);
}
}
@Override
public void onFailure(final Exception e) {
onPermitAcquired.onFailure(e);
}
};
});
if (requirePrimaryTermUpdate(opPrimaryTerm, allowCombineOperationWithPrimaryTermUpdate)) {
synchronized (mutex) {

View File

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -165,17 +164,8 @@ public class CompletionPersistentTaskAction extends Action<PersistentTaskRespons
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
listener.onResponse(new PersistentTaskResponse(task));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -149,17 +148,9 @@ public class RemovePersistentTaskAction extends Action<PersistentTaskResponse> {
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.removePersistentTask(request.taskId, new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
listener.onResponse(new PersistentTaskResponse(task));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
persistentTasksClusterService.removePersistentTask(
request.taskId, ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}

View File

@ -36,7 +36,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -226,18 +225,8 @@ public class StartPersistentTaskAction extends Action<PersistentTaskResponse> {
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.taskId, request.taskName, request.params,
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
listener.onResponse(new PersistentTaskResponse(task));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -181,17 +180,8 @@ public class UpdatePersistentTaskStatusAction extends Action<PersistentTaskRespo
final ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.updatePersistentTaskState(request.taskId, request.allocationId, request.state,
new ActionListener<PersistentTask<?>>() {
@Override
public void onResponse(PersistentTask<?> task) {
listener.onResponse(new PersistentTaskResponse(task));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
ActionListener.delegateFailure(listener,
(delegatedListener, task) -> delegatedListener.onResponse(new PersistentTaskResponse(task))));
}
}
}

View File

@ -97,7 +97,15 @@ public class RepositoriesService implements ClusterStateApplier {
final ActionListener<ClusterStateUpdateResponse> registrationListener;
if (request.verify()) {
registrationListener = new VerifyingRegisterRepositoryListener(request.name(), listener);
registrationListener = ActionListener.delegateFailure(listener, (delegatedListener, clusterStateUpdateResponse) -> {
if (clusterStateUpdateResponse.isAcknowledged()) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(request.name(), ActionListener.delegateFailure(delegatedListener,
(innerDelegatedListener, discoveryNodes) -> innerDelegatedListener.onResponse(clusterStateUpdateResponse)));
} else {
delegatedListener.onResponse(clusterStateUpdateResponse);
}
});
} else {
registrationListener = listener;
}
@ -229,27 +237,18 @@ public class RepositoriesService implements ClusterStateApplier {
final String verificationToken = repository.startVerification();
if (verificationToken != null) {
try {
verifyAction.verify(repositoryName, verificationToken, new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(List<DiscoveryNode> verifyResponse) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
listener.onFailure(e);
return;
}
listener.onResponse(verifyResponse);
});
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
delegatedListener.onFailure(e);
return;
}
delegatedListener.onResponse(verifyResponse);
})));
} catch (Exception e) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
@ -424,41 +423,4 @@ public class RepositoriesService implements ClusterStateApplier {
throw new IllegalStateException("trying to modify or unregister repository that is currently used ");
}
}
private class VerifyingRegisterRepositoryListener implements ActionListener<ClusterStateUpdateResponse> {
private final String name;
private final ActionListener<ClusterStateUpdateResponse> listener;
VerifyingRegisterRepositoryListener(String name, final ActionListener<ClusterStateUpdateResponse> listener) {
this.name = name;
this.listener = listener;
}
@Override
public void onResponse(final ClusterStateUpdateResponse clusterStateUpdateResponse) {
if (clusterStateUpdateResponse.isAcknowledged()) {
// The response was acknowledged - all nodes should know about the new repository, let's verify them
verifyRepository(name, new ActionListener<List<DiscoveryNode>>() {
@Override
public void onResponse(List<DiscoveryNode> verifyResponse) {
listener.onResponse(clusterStateUpdateResponse);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
} else {
listener.onResponse(clusterStateUpdateResponse);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
}

View File

@ -363,19 +363,11 @@ public class TransportClientNodesServiceTests extends ESTestCase {
final List<Transport.Connection> establishedConnections = new CopyOnWriteArrayList<>();
clientService.addConnectBehavior(remoteService, (transport, discoveryNode, profile, listener) ->
transport.openConnection(discoveryNode, profile, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
transport.openConnection(discoveryNode, profile,
ActionListener.delegateFailure(listener, (delegatedListener, connection) -> {
establishedConnections.add(connection);
listener.onResponse(connection);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}));
delegatedListener.onResponse(connection);
})));
clientService.start();
clientService.acceptIncomingRequests();

View File

@ -1291,45 +1291,36 @@ public class RemoteClusterConnectionTests extends ESTestCase {
// route by seed hostname
proxyNode = proxyMapping.get(node.getHostName());
}
return t.openConnection(proxyNode, profile, new ActionListener<Transport.Connection>() {
@Override
public void onResponse(Transport.Connection connection) {
Transport.Connection proxyConnection = new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
return t.openConnection(proxyNode, profile, ActionListener.delegateFailure(listener,
(delegatedListener, connection) -> delegatedListener.onResponse(
new Transport.Connection() {
@Override
public DiscoveryNode getNode() {
return node;
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException, TransportException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void sendRequest(long requestId, String action, TransportRequest request,
TransportRequestOptions options) throws IOException {
connection.sendRequest(requestId, action, request, options);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
connection.addCloseListener(listener);
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public boolean isClosed() {
return connection.isClosed();
}
@Override
public void close() {
connection.close();
}
};
listener.onResponse(proxyConnection);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
});
@Override
public void close() {
connection.close();
}
})));
});
return stubbableTransport;
}
}

View File

@ -608,8 +608,9 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
public void execute() {
try {
new ReplicationOperation<>(request, new PrimaryRef(),
ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType,
primaryTerm).execute();
ActionListener.delegateFailure(listener,
(delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType, primaryTerm)
.execute();
} catch (Exception e) {
listener.onFailure(e);
}
@ -684,28 +685,20 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
IndexShard replica = replicationTargets.findReplicaShard(replicaRouting);
replica.acquireReplicaOperationPermit(
getPrimaryShard().getPendingPrimaryTerm(),
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
new ActionListener<Releasable>() {
@Override
public void onResponse(Releasable releasable) {
try {
performOnReplica(request, replica);
releasable.close();
listener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
listener.onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
},
ThreadPool.Names.WRITE, request);
getPrimaryShard().getPendingPrimaryTerm(),
globalCheckpoint,
maxSeqNoOfUpdatesOrDeletes,
ActionListener.delegateFailure(listener, (delegatedListener, releasable) -> {
try {
performOnReplica(request, replica);
releasable.close();
delegatedListener.onResponse(new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint()));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable);
delegatedListener.onFailure(e);
}
}),
ThreadPool.Names.WRITE, request);
}
@Override
@ -895,7 +888,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase
}
private TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> executeResyncOnPrimary(
IndexShard primary, ResyncReplicationRequest request) throws Exception {
IndexShard primary, ResyncReplicationRequest request) {
final TransportWriteAction.WritePrimaryResult<ResyncReplicationRequest, ResyncReplicationResponse> result =
new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request),
new ResyncReplicationResponse(), null, null, primary, logger);

View File

@ -129,18 +129,9 @@ public final class StubbableTransport implements Transport {
TransportAddress address = node.getAddress();
OpenConnectionBehavior behavior = connectBehaviors.getOrDefault(address, defaultConnectBehavior);
ActionListener<Connection> wrappedListener = new ActionListener<Connection>() {
@Override
public void onResponse(Connection connection) {
listener.onResponse(new WrappedConnection(connection));
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
};
ActionListener<Connection> wrappedListener =
ActionListener.delegateFailure(listener,
(delegatedListener, connection) -> delegatedListener.onResponse(new WrappedConnection(connection)));
if (behavior == null) {
return delegate.openConnection(node, profile, wrappedListener);

View File

@ -12,7 +12,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreClusterStateListener;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
@ -147,19 +146,10 @@ public final class TransportPutFollowAction
}
@Override
protected void doRun() throws Exception {
restoreService.restoreSnapshot(restoreRequest, new ActionListener<RestoreService.RestoreCompletionResponse>() {
@Override
public void onResponse(RestoreService.RestoreCompletionResponse response) {
afterRestoreStarted(clientWithHeaders, request, listener, response);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
protected void doRun() {
restoreService.restoreSnapshot(restoreRequest,
ActionListener.delegateFailure(listener,
(delegatedListener, response) -> afterRestoreStarted(clientWithHeaders, request, delegatedListener, response)));
}
});
}
@ -186,28 +176,20 @@ public final class TransportPutFollowAction
listener = originalListener;
}
RestoreClusterStateListener.createAndRegisterListener(clusterService, response, new ActionListener<RestoreSnapshotResponse>() {
@Override
public void onResponse(RestoreSnapshotResponse restoreSnapshotResponse) {
RestoreClusterStateListener.createAndRegisterListener(clusterService, response,
ActionListener.delegateFailure(listener, (delegatedListener, restoreSnapshotResponse) -> {
RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
if (restoreInfo == null) {
// If restoreInfo is null then it is possible there was a master failure during the
// restore.
listener.onResponse(new PutFollowAction.Response(true, false, false));
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
} else if (restoreInfo.failedShards() == 0) {
initiateFollowing(clientWithHeaders, request, listener);
initiateFollowing(clientWithHeaders, request, delegatedListener);
} else {
assert restoreInfo.failedShards() > 0 : "Should have failed shards";
listener.onResponse(new PutFollowAction.Response(true, false, false));
delegatedListener.onResponse(new PutFollowAction.Response(true, false, false));
}
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}));
}
private void initiateFollowing(