shield: also restore original context to transport handlers

See elastic/elasticsearch#1380

Original commit: elastic/x-pack-elasticsearch@afbd964f18
This commit is contained in:
jaymode 2016-01-28 10:54:45 -05:00
parent 9c080681d8
commit 75894e6b38
1 changed files with 48 additions and 15 deletions

View File

@ -78,27 +78,26 @@ public class ShieldServerTransportService extends TransportService {
@Override
public <T extends TransportResponse> void sendRequest(DiscoveryNode node, String action, TransportRequest request, TransportRequestOptions options, TransportResponseHandler<T> handler) {
try (ThreadContext.StoredContext original = threadPool.getThreadContext().newStoredContext()) {
// FIXME this is really just a hack. What happens is that we send a request and we always copy headers over
// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions and these really fail for internal things...
if ((clientFilter instanceof ClientTransportFilter.Node) && INTERNAL_PREDICATE.test(action)) {
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
clientFilter.outbound(action, request);
super.sendRequest(node, action, request, options, handler);
} catch (Throwable t) {
handler.handleException(new TransportException("failed sending request", t));
}
}
} else {
final ThreadContext.StoredContext original = threadPool.getThreadContext().newStoredContext();
// FIXME this is really just a hack. What happens is that we send a request and we always copy headers over
// Sometimes a system action gets executed like a internal create index request or update mappings request
// which means that the user is copied over to system actions and these really fail for internal things...
if ((clientFilter instanceof ClientTransportFilter.Node) && INTERNAL_PREDICATE.test(action)) {
try (ThreadContext.StoredContext ctx = threadPool.getThreadContext().stashContext()) {
try {
clientFilter.outbound(action, request);
super.sendRequest(node, action, request, options, handler);
super.sendRequest(node, action, request, options, new ContextRestoreResponseHandler<>(original, handler));
} catch (Throwable t) {
handler.handleException(new TransportException("failed sending request", t));
}
}
} else {
try {
clientFilter.outbound(action, request);
super.sendRequest(node, action, request, options, new ContextRestoreResponseHandler<>(original, handler));
} catch (Throwable t) {
handler.handleException(new TransportException("failed sending request", t));
}
}
}
@ -204,4 +203,38 @@ public class ShieldServerTransportService extends TransportService {
}
}
/**
* This handler wrapper ensures that the response thread executes with the correct thread context. Before any of the4 handle methods
* are invoked we restore the context.
*/
private final static class ContextRestoreResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
private final TransportResponseHandler<T> delegate;
private final ThreadContext.StoredContext threadContext;
private ContextRestoreResponseHandler(ThreadContext.StoredContext threadContext, TransportResponseHandler<T> delegate) {
this.delegate = delegate;
this.threadContext = threadContext;
}
@Override
public T newInstance() {
return delegate.newInstance();
}
@Override
public void handleResponse(T response) {
threadContext.restore();
delegate.handleResponse(response);
}
@Override
public void handleException(TransportException exp) {
threadContext.restore();
delegate.handleException(exp);
}
@Override
public String executor() {
return delegate.executor();
}
}
}