Pass forceExecution flag to transport interceptor (#22739)

To effectively allow a plugin to intercept a transport handler it needs
to know if the handler must be executed even if there is a rejection on the
thread pool in the case the wrapper forks a thread to execute the actual handler.
This commit is contained in:
Simon Willnauer 2017-01-23 11:04:27 +01:00 committed by GitHub
parent 6159ca28ae
commit 27b5c2ad54
7 changed files with 30 additions and 7 deletions

View File

@ -38,7 +38,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry.FromXContent;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -236,9 +235,10 @@ public final class NetworkModule {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, actualHandler);
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler);
}
return actualHandler;
}

View File

@ -34,6 +34,7 @@ public interface TransportInterceptor {
* used instead of the passed in handler. By default the provided handler is returned.
*/
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return actualHandler;
}

View File

@ -671,7 +671,7 @@ public class TransportService extends AbstractLifecycleComponent {
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory,
String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, handler);
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
@ -691,7 +691,7 @@ public class TransportService extends AbstractLifecycleComponent {
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, handler);
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);

View File

@ -56,7 +56,6 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteAction;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.explain.ExplainAction;
import org.elasticsearch.action.explain.ExplainRequest;
@ -64,7 +63,6 @@ import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetAction;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
@ -756,6 +754,7 @@ public class IndicesRequestIT extends ESIntegTestCase {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return new InterceptingRequestHandler<>(action, actualHandler);
}

View File

@ -134,8 +134,9 @@ public class TransportClientHeadersTests extends AbstractClientHeadersTestCase {
return Collections.singletonList(new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return instance.interceptHandler(action, executor, actualHandler);
return instance.interceptHandler(action, executor, forceExecution, actualHandler);
}
@Override

View File

@ -41,6 +41,8 @@ import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import java.io.IOException;
import java.util.Arrays;
@ -49,6 +51,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
public class NetworkModuleTests extends ModuleTestCase {
@ -244,8 +247,21 @@ public class NetworkModuleTests extends ModuleTestCase {
Settings settings = Settings.builder()
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build();
AtomicInteger called = new AtomicInteger(0);
TransportInterceptor interceptor = new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
called.incrementAndGet();
if ("foo/bar/boom".equals(action)) {
assertTrue(forceExecution);
} else {
assertFalse(forceExecution);
}
return actualHandler;
}
};
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
@Override
@ -257,6 +273,11 @@ public class NetworkModuleTests extends ModuleTestCase {
});
TransportInterceptor transportInterceptor = module.getTransportInterceptor();
assertEquals(0, called.get());
transportInterceptor.interceptHandler("foo/bar/boom", null, true, null);
assertEquals(1, called.get());
transportInterceptor.interceptHandler("foo/baz/boom", null, false, null);
assertEquals(2, called.get());
assertTrue(transportInterceptor instanceof NetworkModule.CompositeTransportInterceptor);
assertEquals(((NetworkModule.CompositeTransportInterceptor)transportInterceptor).transportInterceptors.size(), 1);
assertSame(((NetworkModule.CompositeTransportInterceptor)transportInterceptor).transportInterceptors.get(0), interceptor);

View File

@ -66,6 +66,7 @@ public final class AssertingTransportInterceptor implements TransportInterceptor
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return new TransportRequestHandler<T>() {