From 0a410d3916b33440db1c4d9189cbf5b855c233d5 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 24 Oct 2016 13:57:07 +0200 Subject: [PATCH] Pass executor name to request interceptor to support async intercept calls (#21089) Today the request interceptor can't support async calls since the response of the async call would execute on a different thread ie. a client or listener thread. This means in-turn that the intercepted handler is not executed with the thread it was supposed to run and therefor can, if it's executing blocking operations, potentially deadlock an entire server. --- .../src/main/resources/checkstyle_suppressions.xml | 1 - .../elasticsearch/common/network/NetworkModule.java | 9 +++++---- .../elasticsearch/transport/TransportInterceptor.java | 2 +- .../org/elasticsearch/transport/TransportService.java | 4 ++-- .../org/elasticsearch/action/IndicesRequestIT.java | 2 +- .../action/ingest/IngestProxyActionFilterTests.java | 11 ----------- .../client/transport/TransportClientHeadersTests.java | 6 +++--- .../transport/AssertingTransportInterceptor.java | 2 +- 8 files changed, 13 insertions(+), 24 deletions(-) diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 6acc73fc91a..040047b6b33 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -297,7 +297,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 5c8f3ec65e8..530ecefd4cf 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -162,8 +162,8 @@ public final class NetworkModule { * @param commandName the names under which the command should be parsed. The {@link ParseField#getPreferredName()} is special because * it is the name under which the command's reader is registered. */ - private static void registerAllocationCommand(Writeable.Reader reader, AllocationCommand.Parser parser, - ParseField commandName) { + private static void registerAllocationCommand(Writeable.Reader reader, + AllocationCommand.Parser parser, ParseField commandName) { allocationCommandRegistry.register(parser, commandName); namedWriteables.add(new Entry(AllocationCommand.class, commandName.getPreferredName(), reader)); } @@ -234,9 +234,10 @@ public final class NetworkModule { } @Override - public TransportRequestHandler interceptHandler(String action, TransportRequestHandler actualHandler) { + public TransportRequestHandler interceptHandler(String action, String executor, + TransportRequestHandler actualHandler) { for (TransportInterceptor interceptor : this.transportInterceptors) { - actualHandler = interceptor.interceptHandler(action, actualHandler); + actualHandler = interceptor.interceptHandler(action, executor, actualHandler); } return actualHandler; } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java b/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java index d8072a81ba6..7b478ce48e6 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportInterceptor.java @@ -33,7 +33,7 @@ public interface TransportInterceptor { * {@link TransportService#registerRequestHandler(String, Supplier, String, TransportRequestHandler)}. The returned handler is * used instead of the passed in handler. By default the provided handler is returned. */ - default TransportRequestHandler interceptHandler(String action, + default TransportRequestHandler interceptHandler(String action, String executor, TransportRequestHandler actualHandler) { return actualHandler; } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index c4a96bc6d1d..f0aa9912084 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -613,7 +613,7 @@ public class TransportService extends AbstractLifecycleComponent { */ public final void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { - handler = interceptor.interceptHandler(action, handler); + handler = interceptor.interceptHandler(action, executor, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestFactory, taskManager, handler, executor, false, true); registerRequestHandler(reg); @@ -633,7 +633,7 @@ public class TransportService extends AbstractLifecycleComponent { String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler handler) { - handler = interceptor.interceptHandler(action, handler); + handler = interceptor.interceptHandler(action, executor, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); registerRequestHandler(reg); diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 92d1768db13..eee873b86a3 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -753,7 +753,7 @@ public class IndicesRequestIT extends ESIntegTestCase { private final Map> requests = new HashMap<>(); @Override - public TransportRequestHandler interceptHandler(String action, + public TransportRequestHandler interceptHandler(String action, String executor, TransportRequestHandler actualHandler) { return new InterceptingRequestHandler<>(action, actualHandler); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java index 5190f7e12ce..85240c4a8e6 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/IngestProxyActionFilterTests.java @@ -268,15 +268,4 @@ public class IngestProxyActionFilterTests extends ESTestCase { assertTrue(run.get()); } - - private static class IngestNodeMatcher extends CustomTypeSafeMatcher { - private IngestNodeMatcher() { - super("discovery node should be an ingest node"); - } - - @Override - protected boolean matchesSafely(DiscoveryNode node) { - return node.isIngestNode(); - } - } } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index 56f78f0c8ea..d25ae28cf28 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -130,9 +130,9 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { return Collections.singletonList(new TransportInterceptor() { @Override - public TransportRequestHandler interceptHandler(String action, - TransportRequestHandler actualHandler) { - return instance.interceptHandler(action, actualHandler); + public TransportRequestHandler interceptHandler(String action, String executor, + TransportRequestHandler actualHandler) { + return instance.interceptHandler(action, executor, actualHandler); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java index ee29864ba03..37ebebc64a6 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java @@ -64,7 +64,7 @@ public final class AssertingTransportInterceptor implements TransportInterceptor } @Override - public TransportRequestHandler interceptHandler(String action, + public TransportRequestHandler interceptHandler(String action, String executor, TransportRequestHandler actualHandler) { return new TransportRequestHandler() {