From 5f0344a9181d8efbad463082c04cec128ee4e4fd Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sun, 15 Jan 2017 13:35:39 +0100 Subject: [PATCH] Pass ThreadContext to transport interceptors to allow header modification (#22618) TransportInterceptors are commonly used to enrich requests with headers etc. which requires access the the thread context. This is not always easily possible since threadpools are hard to access for instance if the interceptor is used on a transport client. This commit passes on the thread context to all the interceptors for further consumption. Closes #22585 --- .../common/network/NetworkModule.java | 3 ++- .../elasticsearch/plugins/NetworkPlugin.java | 8 +++++- .../action/IndicesRequestIT.java | 4 ++- .../TransportClientHeadersTests.java | 4 ++- .../common/network/NetworkModuleTests.java | 26 ++++++++++++++++--- .../AssertingTransportInterceptor.java | 4 ++- 6 files changed, 41 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index e5d64f71b66..04f6b62dde1 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -125,7 +125,8 @@ public final class NetworkModule { for (Map.Entry> entry : httpTransportFactory.entrySet()) { registerTransport(entry.getKey(), entry.getValue()); } - List transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry); + List transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry, + threadPool.getThreadContext()); for (TransportInterceptor interceptor : transportInterceptors) { registerTransportInterceptor(interceptor); } diff --git a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index 991a21f1b32..ceb7e077e11 100644 --- a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -42,8 +43,13 @@ public interface NetworkPlugin { /** * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing * transport (inter-node) requests. This must not return null + * + * @param namedWriteableRegistry registry of all named writeables registered + * @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional + * headers in the interceptors */ - default List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + default List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { return Collections.emptyList(); } diff --git a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java index 6a1ac54c4f6..86a9f632cac 100644 --- a/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java +++ b/core/src/test/java/org/elasticsearch/action/IndicesRequestIT.java @@ -80,6 +80,7 @@ import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -743,7 +744,8 @@ public class IndicesRequestIT extends ESIntegTestCase { public static class TestPlugin extends Plugin implements NetworkPlugin { public final InterceptingTransportService instance = new InterceptingTransportService(); @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { return Collections.singletonList(instance); } } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java index b301272a9ab..74b6e08fc6f 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -128,7 +129,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase { private InternalTransportServiceInterceptor instance = new InternalTransportServiceInterceptor(); @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { return Collections.singletonList(new TransportInterceptor() { @Override public TransportRequestHandler interceptHandler(String action, String executor, diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index b1ec686c395..11799c99cb1 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpInfo; import org.elasticsearch.http.HttpServerAdapter; @@ -37,6 +38,7 @@ import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.cat.AbstractCatAction; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInterceptor; @@ -47,9 +49,23 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class NetworkModuleTests extends ModuleTestCase { + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool(NetworkModuleTests.class.getName()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } static class FakeHttpTransport extends AbstractLifecycleComponent implements HttpServerTransport { public FakeHttpTransport() { @@ -233,7 +249,9 @@ public class NetworkModuleTests extends ModuleTestCase { }; NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() { @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { + assertNotNull(threadContext); return Collections.singletonList(interceptor); } }); @@ -246,7 +264,9 @@ public class NetworkModuleTests extends ModuleTestCase { NullPointerException nullPointerException = expectThrows(NullPointerException.class, () -> { newNetworkModule(settings, false, new NetworkPlugin() { @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { + assertNotNull(threadContext); return Collections.singletonList(null); } }); @@ -256,6 +276,6 @@ public class NetworkModuleTests extends ModuleTestCase { } private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) { - return new NetworkModule(settings, transportClient, Arrays.asList(plugins), null, null, null, null, xContentRegistry(), null); + return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java index 0c6c18e1a20..55c484d1aa2 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AssertingTransportInterceptor.java @@ -22,6 +22,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.tasks.Task; @@ -51,7 +52,8 @@ public final class AssertingTransportInterceptor implements TransportInterceptor } @Override - public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) { + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext) { return Collections.singletonList(new AssertingTransportInterceptor(settings, namedWriteableRegistry)); } }