Pass ThreadContext to transport interceptors to allow header modification (#22618)

TransportInterceptors are commonly used to enrich requests with headers etc.
which requires access the the thread context. This is not always easily possible
since threadpools are hard to access for instance if the interceptor is used on a transport client.

This commit passes on the thread context to all the interceptors for further consumption.

Closes #22585
This commit is contained in:
Simon Willnauer 2017-01-15 13:35:39 +01:00 committed by GitHub
parent bed719de0a
commit 5f0344a918
6 changed files with 41 additions and 8 deletions

View File

@ -125,7 +125,8 @@ public final class NetworkModule {
for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry);
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,
threadPool.getThreadContext());
for (TransportInterceptor interceptor : transportInterceptors) {
registerTransportInterceptor(interceptor);
}

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -42,8 +43,13 @@ public interface NetworkPlugin {
/**
* Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
* transport (inter-node) requests. This must not return <code>null</code>
*
* @param namedWriteableRegistry registry of all named writeables registered
* @param threadContext a {@link ThreadContext} of the current nodes or clients {@link ThreadPool} that can be used to set additional
* headers in the interceptors
*/
default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return Collections.emptyList();
}

View File

@ -80,6 +80,7 @@ import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -743,7 +744,8 @@ public class IndicesRequestIT extends ESIntegTestCase {
public static class TestPlugin extends Plugin implements NetworkPlugin {
public final InterceptingTransportService instance = new InterceptingTransportService();
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return Collections.singletonList(instance);
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -128,7 +129,8 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
private InternalTransportServiceInterceptor instance = new InternalTransportServiceInterceptor();
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return Collections.singletonList(new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
@ -37,6 +38,7 @@ import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.cat.AbstractCatAction;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
@ -47,9 +49,23 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class NetworkModuleTests extends ModuleTestCase {
private ThreadPool threadPool;
@Override
public void setUp() throws Exception {
super.setUp();
threadPool = new TestThreadPool(NetworkModuleTests.class.getName());
}
@Override
public void tearDown() throws Exception {
super.tearDown();
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
static class FakeHttpTransport extends AbstractLifecycleComponent implements HttpServerTransport {
public FakeHttpTransport() {
@ -233,7 +249,9 @@ public class NetworkModuleTests extends ModuleTestCase {
};
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
assertNotNull(threadContext);
return Collections.singletonList(interceptor);
}
});
@ -246,7 +264,9 @@ public class NetworkModuleTests extends ModuleTestCase {
NullPointerException nullPointerException = expectThrows(NullPointerException.class, () -> {
newNetworkModule(settings, false, new NetworkPlugin() {
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
assertNotNull(threadContext);
return Collections.singletonList(null);
}
});
@ -256,6 +276,6 @@ public class NetworkModuleTests extends ModuleTestCase {
}
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), null, null, null, null, xContentRegistry(), null);
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null);
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.Task;
@ -51,7 +52,8 @@ public final class AssertingTransportInterceptor implements TransportInterceptor
}
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext) {
return Collections.singletonList(new AssertingTransportInterceptor(settings, namedWriteableRegistry));
}
}