Pass executor name to request interceptor to support async intercept calls (#21089)

Today the request interceptor can't support async calls since the response
of the async call would execute on a different thread ie. a client or listener
thread. This means in-turn that the intercepted handler is not executed with the
thread it was supposed to run and therefor can, if it's executing blocking
operations, potentially deadlock an entire server.
This commit is contained in:
Simon Willnauer 2016-10-24 13:57:07 +02:00 committed by GitHub
parent 127b4a8efc
commit 0a410d3916
8 changed files with 13 additions and 24 deletions

View File

@ -297,7 +297,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]lucene[/\\]store[/\\]ByteArrayIndexInput.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]lucene[/\\]uid[/\\]Versions.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]network[/\\]Cidrs.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]network[/\\]NetworkModule.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]network[/\\]NetworkService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]recycler[/\\]Recyclers.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]BigArrays.java" checks="LineLength" />

View File

@ -162,8 +162,8 @@ public final class NetworkModule {
* @param commandName the names under which the command should be parsed. The {@link ParseField#getPreferredName()} is special because
* it is the name under which the command's reader is registered.
*/
private static <T extends AllocationCommand> void registerAllocationCommand(Writeable.Reader<T> reader, AllocationCommand.Parser<T> parser,
ParseField commandName) {
private static <T extends AllocationCommand> void registerAllocationCommand(Writeable.Reader<T> reader,
AllocationCommand.Parser<T> parser, ParseField commandName) {
allocationCommandRegistry.register(parser, commandName);
namedWriteables.add(new Entry(AllocationCommand.class, commandName.getPreferredName(), reader));
}
@ -234,9 +234,10 @@ public final class NetworkModule {
}
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, TransportRequestHandler<T> actualHandler) {
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
TransportRequestHandler<T> actualHandler) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, actualHandler);
actualHandler = interceptor.interceptHandler(action, executor, actualHandler);
}
return actualHandler;
}

View File

@ -33,7 +33,7 @@ public interface TransportInterceptor {
* {@link TransportService#registerRequestHandler(String, Supplier, String, TransportRequestHandler)}. The returned handler is
* used instead of the passed in handler. By default the provided handler is returned.
*/
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action,
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
TransportRequestHandler<T> actualHandler) {
return actualHandler;
}

View File

@ -613,7 +613,7 @@ public class TransportService extends AbstractLifecycleComponent {
*/
public final <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory,
String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, handler);
handler = interceptor.interceptHandler(action, executor, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
@ -633,7 +633,7 @@ public class TransportService extends AbstractLifecycleComponent {
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, handler);
handler = interceptor.interceptHandler(action, executor, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);

View File

@ -753,7 +753,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action,
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
TransportRequestHandler<T> actualHandler) {
return new InterceptingRequestHandler<>(action, actualHandler);
}

View File

@ -268,15 +268,4 @@ public class IngestProxyActionFilterTests extends ESTestCase {
assertTrue(run.get());
}
private static class IngestNodeMatcher extends CustomTypeSafeMatcher<DiscoveryNode> {
private IngestNodeMatcher() {
super("discovery node should be an ingest node");
}
@Override
protected boolean matchesSafely(DiscoveryNode node) {
return node.isIngestNode();
}
}
}

View File

@ -130,9 +130,9 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry) {
return Collections.singletonList(new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action,
TransportRequestHandler<T> actualHandler) {
return instance.interceptHandler(action, actualHandler);
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
TransportRequestHandler<T> actualHandler) {
return instance.interceptHandler(action, executor, actualHandler);
}
@Override

View File

@ -64,7 +64,7 @@ public final class AssertingTransportInterceptor implements TransportInterceptor
}
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action,
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
TransportRequestHandler<T> actualHandler) {
return new TransportRequestHandler<T>() {