diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java index eb2ee3b8c3c..bba780e5293 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/transport/ShieldServerTransportService.java @@ -78,27 +78,26 @@ public class ShieldServerTransportService extends TransportService { @Override public void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler handler) { - try (ThreadContext.StoredContext original = threadPool.getThreadContext().newStoredContext()) { - // FIXME this is really just a hack. What happens is that we send a request and we always copy headers over - // Sometimes a system action gets executed like a internal create index request or update mappings request - // which means that the user is copied over to system actions and these really fail for internal things... - if ((clientFilter instanceof ClientTransportFilter.Node) && INTERNAL_PREDICATE.test(action)) { - try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { - try { - clientFilter.outbound(action, request); - super.sendRequest(node, action, request, options, handler); - } catch (Throwable t) { - handler.handleException(new TransportException("failed sending request", t)); - } - } - } else { + final ThreadContext.StoredContext original = threadPool.getThreadContext().newStoredContext(); + // FIXME this is really just a hack. What happens is that we send a request and we always copy headers over + // Sometimes a system action gets executed like a internal create index request or update mappings request + // which means that the user is copied over to system actions and these really fail for internal things... + if ((clientFilter instanceof ClientTransportFilter.Node) && INTERNAL_PREDICATE.test(action)) { + try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) { try { clientFilter.outbound(action, request); - super.sendRequest(node, action, request, options, handler); + super.sendRequest(node, action, request, options, new ContextRestoreResponseHandler<>(original, handler)); } catch (Throwable t) { handler.handleException(new TransportException("failed sending request", t)); } } + } else { + try { + clientFilter.outbound(action, request); + super.sendRequest(node, action, request, options, new ContextRestoreResponseHandler<>(original, handler)); + } catch (Throwable t) { + handler.handleException(new TransportException("failed sending request", t)); + } } } @@ -204,4 +203,38 @@ public class ShieldServerTransportService extends TransportService { } } + /** + * This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the4 handle methods + * are invoked we restore the context. + */ + private final static class ContextRestoreResponseHandler implements TransportResponseHandler { + private final TransportResponseHandler delegate; + private final ThreadContext.StoredContext threadContext; + private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler delegate) { + this.delegate = delegate; + this.threadContext = threadContext; + } + + @Override + public T newInstance() { + return delegate.newInstance(); + } + + @Override + public void handleResponse(T response) { + threadContext.restore(); + delegate.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + threadContext.restore(); + delegate.handleException(exp); + } + + @Override + public String executor() { + return delegate.executor(); + } + } }