Cleanup Various Action- Listener and Runnable Usages (#42273) (#45052)

* Dry up code for creating simple `ActionRunnable` a little
* Shorten some other code around `ActionListener` usage, in particular
when wrapping it in a `TransportResponseListener`
This commit is contained in:
Armin Braun 2019-07-31 18:55:31 +02:00 committed by GitHub
parent ee663dc9ac
commit 8d63bd1d1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 112 additions and 248 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action; package org.elasticsearch.action;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
/** /**
@ -29,6 +30,23 @@ public abstract class ActionRunnable<Response> extends AbstractRunnable {
protected final ActionListener<Response> listener; protected final ActionListener<Response> listener;
/**
* Creates a {@link Runnable} that wraps the given listener and a consumer of it that is executed when the {@link Runnable} is run.
* Invokes {@link ActionListener#onFailure(Exception)} on it if an exception is thrown on executing the consumer.
* @param listener ActionListener to wrap
* @param consumer Consumer of wrapped {@code ActionListener}
* @param <T> Type of the given {@code ActionListener}
* @return Wrapped {@code Runnable}
*/
public static <T> ActionRunnable<T> wrap(ActionListener<T> listener, CheckedConsumer<ActionListener<T>, Exception> consumer) {
return new ActionRunnable<T>(listener) {
@Override
protected void doRun() throws Exception {
consumer.accept(listener);
}
};
}
public ActionRunnable(ActionListener<Response> listener) { public ActionRunnable(ActionListener<Response> listener) {
this.listener = listener; this.listener = listener;
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
@ -32,7 +33,6 @@ import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -45,9 +45,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskResult; import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.tasks.TaskResultsService; import org.elasticsearch.tasks.TaskResultsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
@ -118,27 +116,7 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
} }
GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); GetTaskRequest nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId());
transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(), transportService.sendRequest(node, GetTaskAction.NAME, nodeRequest, builder.build(),
new TransportResponseHandler<GetTaskResponse>() { new ActionListenerResponseHandler<>(listener, GetTaskResponse::new, ThreadPool.Names.SAME));
@Override
public GetTaskResponse read(StreamInput in) throws IOException {
return new GetTaskResponse(in);
}
@Override
public void handleResponse(GetTaskResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} }
/** /**

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException; import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.RoutingMissingException;
@ -56,7 +57,6 @@ import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexNotFoundException;
@ -331,10 +331,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
* retries on retryable cluster blocks, resolves item requests, * retries on retryable cluster blocks, resolves item requests,
* constructs shard bulk requests and delegates execution to shard bulk action * constructs shard bulk requests and delegates execution to shard bulk action
* */ * */
private final class BulkOperation extends AbstractRunnable { private final class BulkOperation extends ActionRunnable<BulkResponse> {
private final Task task; private final Task task;
private final BulkRequest bulkRequest; private final BulkRequest bulkRequest;
private final ActionListener<BulkResponse> listener;
private final AtomicArray<BulkItemResponse> responses; private final AtomicArray<BulkItemResponse> responses;
private final long startTimeNanos; private final long startTimeNanos;
private final ClusterStateObserver observer; private final ClusterStateObserver observer;
@ -342,9 +341,9 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, AtomicArray<BulkItemResponse> responses, BulkOperation(Task task, BulkRequest bulkRequest, ActionListener<BulkResponse> listener, AtomicArray<BulkItemResponse> responses,
long startTimeNanos, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) { long startTimeNanos, Map<String, IndexNotFoundException> indicesThatCannotBeCreated) {
super(listener);
this.task = task; this.task = task;
this.bulkRequest = bulkRequest; this.bulkRequest = bulkRequest;
this.listener = listener;
this.responses = responses; this.responses = responses;
this.startTimeNanos = startTimeNanos; this.startTimeNanos = startTimeNanos;
this.indicesThatCannotBeCreated = indicesThatCannotBeCreated; this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
@ -352,12 +351,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
} }
@Override @Override
public void onFailure(Exception e) { protected void doRun() {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
final ClusterState clusterState = observer.setAndGetObservedState(); final ClusterState clusterState = observer.setAndGetObservedState();
if (handleBlockExceptions(clusterState)) { if (handleBlockExceptions(clusterState)) {
return; return;

View File

@ -64,9 +64,7 @@ class SimulateExecutionService {
} }
public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) { public void execute(SimulatePipelineRequest.Parsed request, ActionListener<SimulatePipelineResponse> listener) {
threadPool.executor(THREAD_POOL_NAME).execute(new ActionRunnable<SimulatePipelineResponse>(listener) { threadPool.executor(THREAD_POOL_NAME).execute(ActionRunnable.wrap(listener, l -> {
@Override
protected void doRun() throws Exception {
List<SimulateDocumentResult> responses = new ArrayList<>(); List<SimulateDocumentResult> responses = new ArrayList<>();
for (IngestDocument ingestDocument : request.getDocuments()) { for (IngestDocument ingestDocument : request.getDocuments()) {
SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose()); SimulateDocumentResult response = executeDocument(request.getPipeline(), ingestDocument, request.isVerbose());
@ -74,8 +72,7 @@ class SimulateExecutionService {
responses.add(response); responses.add(response);
} }
} }
listener.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses)); l.onResponse(new SimulatePipelineResponse(request.getPipeline().getId(), request.isVerbose(), responses));
} }));
});
} }
} }

View File

@ -298,12 +298,8 @@ public abstract class TransportBroadcastAction<
} }
} }
protected void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) { private void asyncShardOperation(ShardRequest request, Task task, ActionListener<ShardResponse> listener) {
transportService.getThreadPool().executor(shardExecutor).execute(new ActionRunnable<ShardResponse>(listener) { transportService.getThreadPool().executor(shardExecutor)
@Override .execute(ActionRunnable.wrap(listener, l -> l.onResponse(shardOperation(request, task))));
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, task));
}
});
} }
} }

View File

@ -161,12 +161,8 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
delegatedListener.onFailure(t); delegatedListener.onFailure(t);
} }
}); });
threadPool.executor(executor).execute(new ActionRunnable<Response>(delegate) { threadPool.executor(executor)
@Override .execute(ActionRunnable.wrap(delegate, l -> masterOperation(task, request, clusterState, l)));
protected void doRun() throws Exception {
masterOperation(task, request, clusterState, delegate);
}
});
} }
} else { } else {
if (nodes.getMasterNode() == null) { if (nodes.getMasterNode() == null) {

View File

@ -106,12 +106,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException; protected abstract Response shardOperation(Request request, ShardId shardId) throws IOException;
protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException { protected void asyncShardOperation(Request request, ShardId shardId, ActionListener<Response> listener) throws IOException {
threadPool.executor(getExecutor(request, shardId)).execute(new ActionRunnable<Response>(listener) { threadPool.executor(getExecutor(request, shardId))
@Override .execute(ActionRunnable.wrap(listener, l -> l.onResponse((shardOperation(request, shardId)))));
protected void doRun() throws Exception {
listener.onResponse(shardOperation(request, shardId));
}
});
} }
protected abstract Writeable.Reader<Response> getResponseReader(); protected abstract Writeable.Reader<Response> getResponseReader();

View File

@ -254,12 +254,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
if (retryCount < request.retryOnConflict()) { if (retryCount < request.retryOnConflict()) {
logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]", logger.trace("Retry attempt [{}] of [{}] on version conflict on [{}][{}][{}]",
retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id()); retryCount + 1, request.retryOnConflict(), request.index(), request.getShardId(), request.id());
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) { threadPool.executor(executor()).execute(ActionRunnable.wrap(listener, l -> shardOperation(request, l, retryCount + 1)));
@Override
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
return; return;
} }
} }

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig; import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateTaskListener;
@ -43,7 +44,6 @@ import org.elasticsearch.discovery.zen.MembershipAction;
import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.discovery.zen.ZenDiscovery;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
@ -349,17 +349,7 @@ public class JoinHelper {
transportService.sendRequest(node, actionName, transportService.sendRequest(node, actionName,
new ValidateJoinRequest(state), new ValidateJoinRequest(state),
TransportRequestOptions.builder().withTimeout(joinTimeout).build(), TransportRequestOptions.builder().withTimeout(joinTimeout).build(),
new EmptyTransportResponseHandler(ThreadPool.Names.GENERIC) { new ActionListenerResponseHandler<>(listener, i -> Empty.INSTANCE, ThreadPool.Names.GENERIC));
@Override
public void handleResponse(TransportResponse.Empty response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
});
} }
public interface JoinCallback { public interface JoinCallback {

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.NotifyOnceListener; import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
@ -58,7 +59,6 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.util.concurrent.CountDown;
@ -296,26 +296,20 @@ public class MetaDataIndexStateService {
* this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing, * this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing,
* the index is considered ready to be closed. * the index is considered ready to be closed.
*/ */
class WaitForClosedBlocksApplied extends AbstractRunnable { class WaitForClosedBlocksApplied extends ActionRunnable<Map<Index, IndexResult>> {
private final Map<Index, ClusterBlock> blockedIndices; private final Map<Index, ClusterBlock> blockedIndices;
private final CloseIndexClusterStateUpdateRequest request; private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, IndexResult>> listener;
private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices, private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
final CloseIndexClusterStateUpdateRequest request, final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, IndexResult>> listener) { final ActionListener<Map<Index, IndexResult>> listener) {
super(listener);
if (blockedIndices == null || blockedIndices.isEmpty()) { if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null"); throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
} }
this.blockedIndices = blockedIndices; this.blockedIndices = blockedIndices;
this.request = request; this.request = request;
this.listener = listener;
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
} }
@Override @Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
@ -89,17 +90,13 @@ public final class ListenableFuture<V> extends BaseFuture<V> implements ActionLi
private void notifyListener(ActionListener<V> listener, ExecutorService executorService) { private void notifyListener(ActionListener<V> listener, ExecutorService executorService) {
try { try {
executorService.execute(new Runnable() { executorService.execute(new ActionRunnable<V>(listener) {
@Override @Override
public void run() { protected void doRun() {
try { // call get in a non-blocking fashion as we could be on a network thread
// call get in a non-blocking fashion as we could be on a network thread // or another thread like the scheduler, which we should never block!
// or another thread like the scheduler, which we should never block! V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS);
V value = FutureUtils.get(ListenableFuture.this, 0L, TimeUnit.NANOSECONDS); listener.onResponse(value);
listener.onResponse(value);
} catch (Exception e) {
listener.onFailure(e);
}
} }
@Override @Override

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
@ -41,11 +42,9 @@ import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler; import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
@ -86,27 +85,8 @@ public class LocalAllocateDangledIndices {
} }
AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(), AllocateDangledRequest request = new AllocateDangledRequest(clusterService.localNode(),
indices.toArray(new IndexMetaData[indices.size()])); indices.toArray(new IndexMetaData[indices.size()]));
transportService.sendRequest(masterNode, ACTION_NAME, request, new TransportResponseHandler<AllocateDangledResponse>() { transportService.sendRequest(masterNode, ACTION_NAME, request,
@Override new ActionListenerResponseHandler<>(listener, AllocateDangledResponse::new, ThreadPool.Names.SAME));
public AllocateDangledResponse read(StreamInput in) throws IOException {
return new AllocateDangledResponse(in);
}
@Override
public void handleResponse(AllocateDangledResponse response) {
listener.onResponse(response);
}
@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
} }
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> { class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.shard;
import org.elasticsearch.Assertions; import org.elasticsearch.Assertions;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.ContextPreservingActionListener; import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.collect.Tuple;
@ -249,9 +250,24 @@ final class IndexShardOperationPermits implements Closeable {
final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false); final Supplier<StoredContext> contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
final ActionListener<Releasable> wrappedListener; final ActionListener<Releasable> wrappedListener;
if (executorOnDelay != null) { if (executorOnDelay != null) {
wrappedListener = wrappedListener = ActionListener.delegateFailure(new ContextPreservingActionListener<>(contextSupplier, onAcquired),
new PermitAwareThreadedActionListener(threadPool, executorOnDelay, (l, r) -> threadPool.executor(executorOnDelay).execute(new ActionRunnable<Releasable>(l) {
new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution); @Override
public boolean isForceExecution() {
return forceExecution;
}
@Override
protected void doRun() {
listener.onResponse(r);
}
@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(r);
super.onRejection(e);
}
}));
} else { } else {
wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired); wrappedListener = new ContextPreservingActionListener<>(contextSupplier, onAcquired);
} }
@ -337,57 +353,4 @@ final class IndexShardOperationPermits implements Closeable {
} }
} }
} }
/**
* A permit-aware action listener wrapper that spawns onResponse listener invocations off on a configurable thread-pool.
* Being permit-aware, it also releases the permit when hitting thread-pool rejections and falls back to the
* invoker's thread to communicate failures.
*/
private static class PermitAwareThreadedActionListener implements ActionListener<Releasable> {
private final ThreadPool threadPool;
private final String executor;
private final ActionListener<Releasable> listener;
private final boolean forceExecution;
private PermitAwareThreadedActionListener(ThreadPool threadPool, String executor, ActionListener<Releasable> listener,
boolean forceExecution) {
this.threadPool = threadPool;
this.executor = executor;
this.listener = listener;
this.forceExecution = forceExecution;
}
@Override
public void onResponse(final Releasable releasable) {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public boolean isForceExecution() {
return forceExecution;
}
@Override
protected void doRun() throws Exception {
listener.onResponse(releasable);
}
@Override
public void onRejection(Exception e) {
IOUtils.closeWhileHandlingException(releasable);
super.onRejection(e);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e); // will possibly execute on the caller thread
}
});
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
}
} }

View File

@ -23,6 +23,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
@ -233,46 +234,41 @@ public class RepositoriesService implements ClusterStateApplier {
public void verifyRepository(final String repositoryName, final ActionListener<List<DiscoveryNode>> listener) { public void verifyRepository(final String repositoryName, final ActionListener<List<DiscoveryNode>> listener) {
final Repository repository = repository(repositoryName); final Repository repository = repository(repositoryName);
try { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<List<DiscoveryNode>>(listener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { @Override
try { protected void doRun() {
final String verificationToken = repository.startVerification(); final String verificationToken = repository.startVerification();
if (verificationToken != null) { if (verificationToken != null) {
try { try {
verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener, verifyAction.verify(repositoryName, verificationToken, ActionListener.delegateFailure(listener,
(delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { (delegatedListener, verifyResponse) -> threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
repository.endVerification(verificationToken);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), e);
delegatedListener.onFailure(e);
return;
}
delegatedListener.onResponse(verifyResponse);
})));
} catch (Exception e) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try { try {
repository.endVerification(verificationToken); repository.endVerification(verificationToken);
} catch (Exception inner) { } catch (Exception e) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage( logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), inner); "[{}] failed to finish repository verification", repositoryName), e);
delegatedListener.onFailure(e);
return;
} }
listener.onFailure(e); delegatedListener.onResponse(verifyResponse);
}); })));
} } catch (Exception e) {
} else { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
listener.onResponse(Collections.emptyList()); try {
repository.endVerification(verificationToken);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn(() -> new ParameterizedMessage(
"[{}] failed to finish repository verification", repositoryName), inner);
}
listener.onFailure(e);
});
} }
} catch (Exception e) { } else {
listener.onFailure(e); listener.onResponse(Collections.emptyList());
} }
}); }
} catch (Exception e) { });
listener.onFailure(e);
}
} }

View File

@ -341,12 +341,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
} }
private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) { private <T> void runAsync(long id, Supplier<T> executable, ActionListener<T> listener) {
getExecutor(id).execute(new ActionRunnable<T>(listener) { getExecutor(id).execute(ActionRunnable.wrap(listener, l -> l.onResponse(executable.get())));
@Override
protected void doRun() {
listener.onResponse(executable.get());
}
});
} }
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
@ -1044,15 +1039,9 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id()); IndexShard shard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
Executor executor = getExecutor(shard); Executor executor = getExecutor(shard);
ActionListener<Rewriteable> actionListener = ActionListener.wrap(r -> ActionListener<Rewriteable> actionListener = ActionListener.wrap(r ->
// now we need to check if there is a pending refresh and register // now we need to check if there is a pending refresh and register
shard.awaitShardSearchActive(b -> shard.awaitShardSearchActive(b -> executor.execute(ActionRunnable.wrap(listener, l -> l.onResponse(request)))),
executor.execute(new ActionRunnable<ShardSearchRequest>(listener) { listener::onFailure);
@Override
protected void doRun() {
listener.onResponse(request);
}
})
), listener::onFailure);
// we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as // we also do rewrite on the coordinating node (TransportSearchService) but we also need to do it here for BWC as well as
// AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not // AliasFilters that might need to be rewritten. These are edge-cases but we are every efficient doing the rewrite here so it's not
// adding a lot of overhead // adding a lot of overhead

View File

@ -1297,17 +1297,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
*/ */
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) { private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<Void>(listener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
@Override Repository repository = repositoriesService.repository(snapshot.getRepository());
protected void doRun() { repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> {
Repository repository = repositoriesService.repository(snapshot.getRepository()); logger.info("snapshot [{}] deleted", snapshot);
repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, ActionListener.wrap(v -> { removeSnapshotDeletionFromClusterState(snapshot, null, l);
logger.info("snapshot [{}] deleted", snapshot); }, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, l)
removeSnapshotDeletionFromClusterState(snapshot, null, listener); ));
}, ex -> removeSnapshotDeletionFromClusterState(snapshot, ex, listener) }));
));
}
});
} }
/** /**

View File

@ -30,7 +30,6 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.OriginSettingClient; import org.elasticsearch.client.OriginSettingClient;
import org.elasticsearch.client.Requests; import org.elasticsearch.client.Requests;
@ -134,18 +133,7 @@ public class TaskResultsService {
// The index already exists but doesn't have our mapping // The index already exists but doesn't have our mapping
client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE) client.admin().indices().preparePutMapping(TASK_INDEX).setType(TASK_TYPE)
.setSource(taskResultIndexMapping(), XContentType.JSON) .setSource(taskResultIndexMapping(), XContentType.JSON)
.execute(new ActionListener<AcknowledgedResponse>() { .execute(ActionListener.delegateFailure(listener, (l, r) -> doStoreResult(taskResult, listener)));
@Override
public void onResponse(AcknowledgedResponse putMappingResponse) {
doStoreResult(taskResult, listener);
}
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
}
);
} else { } else {
doStoreResult(taskResult, listener); doStoreResult(taskResult, listener);
} }