From 6ab4b6b0ac54f4adcd1de1909c0db2f9489142aa Mon Sep 17 00:00:00 2001 From: Tal Levy Date: Tue, 22 Aug 2017 15:47:05 -0700 Subject: [PATCH] revamp TransportRequest handlers to support Writeable (#26315) This PR begins the long journey to deprecating Streamable. The idea here is to add additional method signatures that support Writeable.Reader, so that the work to migrate objects TransportMessage to implement Writeable and not Streamable. One example conversion is done in this PR: SimulatePipelineRequest. --- .../elasticsearch/action/ActionRequest.java | 4 ++ .../ingest/SimulatePipelineRequest.java | 23 ++++++---- .../SimulatePipelineTransportAction.java | 2 +- .../support/HandledTransportAction.java | 15 +++++++ .../common/io/stream/Streamable.java | 9 ++++ .../transport/RequestHandlerRegistry.java | 13 +++--- .../elasticsearch/transport/TcpTransport.java | 3 +- .../transport/TransportActionProxy.java | 10 ++--- .../transport/TransportMessage.java | 3 +- .../transport/TransportRequest.java | 4 ++ .../transport/TransportService.java | 44 ++++++++++++++++++- .../ingest/SimulatePipelineRequestTests.java | 9 ++-- .../transport/TransportActionProxyTests.java | 2 +- .../test/transport/MockTransportService.java | 3 +- 14 files changed, 109 insertions(+), 35 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/ActionRequest.java b/core/src/main/java/org/elasticsearch/action/ActionRequest.java index 769b2e7b573..f5f10c7bcfa 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ActionRequest.java @@ -34,6 +34,10 @@ public abstract class ActionRequest extends TransportRequest { // this.listenerThreaded = request.listenerThreaded(); } + public ActionRequest(StreamInput in) throws IOException { + super(in); + } + public abstract ActionRequestValidationException validate(); /** diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java index 0330bc1fbe8..39d7f96e8e6 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineRequest.java @@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.ingest.ConfigurationUtils; @@ -68,6 +69,18 @@ public class SimulatePipelineRequest extends ActionRequest { SimulatePipelineRequest() { } + SimulatePipelineRequest(StreamInput in) throws IOException { + super(in); + id = in.readOptionalString(); + verbose = in.readBoolean(); + source = in.readBytesReference(); + if (in.getVersion().onOrAfter(Version.V_5_3_0)) { + xContentType = XContentType.readFrom(in); + } else { + xContentType = XContentFactory.xContentType(source); + } + } + @Override public ActionRequestValidationException validate() { return null; @@ -99,15 +112,7 @@ public class SimulatePipelineRequest extends ActionRequest { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - id = in.readOptionalString(); - verbose = in.readBoolean(); - source = in.readBytesReference(); - if (in.getVersion().onOrAfter(Version.V_5_3_0)) { - xContentType = XContentType.readFrom(in); - } else { - xContentType = XContentFactory.xContentType(source); - } + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 3f67007df69..d660840e9b7 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -40,7 +40,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction requestReader, + IndexNameExpressionResolver indexNameExpressionResolver) { + this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestReader); + } + protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier request) { @@ -51,6 +58,14 @@ public abstract class HandledTransportAction requestReader) { + super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager()); + transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader, + new TransportHandler()); + } + class TransportHandler implements TransportRequestHandler { @Override diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java index 99c054c4c78..86a4d3ed95c 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Streamable.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.io.stream; import java.io.IOException; +import java.util.function.Supplier; /** * Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown @@ -43,4 +44,12 @@ public interface Streamable { * Write this object's fields to a {@linkplain StreamOutput}. */ void writeTo(StreamOutput out) throws IOException; + + static Writeable.Reader newWriteableReader(Supplier supplier) { + return (StreamInput in) -> { + T request = supplier.get(); + request.readFrom(in); + return request; + }; + } } diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index 2e56ff91021..fc1af1d876a 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -19,6 +19,8 @@ package org.elasticsearch.transport; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; @@ -32,15 +34,14 @@ public class RequestHandlerRegistry { private final boolean forceExecution; private final boolean canTripCircuitBreaker; private final String executor; - private final Supplier requestFactory; private final TaskManager taskManager; + private final Writeable.Reader requestReader; - public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, + public RequestHandlerRegistry(String action, Writeable.Reader requestReader, TaskManager taskManager, TransportRequestHandler handler, String executor, boolean forceExecution, boolean canTripCircuitBreaker) { this.action = action; - this.requestFactory = requestFactory; - assert newRequest() != null; + this.requestReader = requestReader; this.handler = handler; this.forceExecution = forceExecution; this.canTripCircuitBreaker = canTripCircuitBreaker; @@ -52,8 +53,8 @@ public class RequestHandlerRegistry { return action; } - public Request newRequest() { - return requestFactory.get(); + public Request newRequest(StreamInput in) throws IOException { + return requestReader.read(in); } public void processMessageReceived(Request request, TransportChannel channel) throws Exception { diff --git a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java index 689a54fc8da..8ff86e593c0 100644 --- a/core/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -1475,9 +1475,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent i } transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName, messageLengthBytes); - final TransportRequest request = reg.newRequest(); + final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); - request.readFrom(stream); // in case we throw an exception, i.e. when the limit is hit, we don't want to verify validateRequest(stream, requestId, action); threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel)); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java index 5259fca507e..e08d89d181f 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportActionProxy.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -97,11 +98,11 @@ public final class TransportActionProxy { static class ProxyRequest extends TransportRequest { T wrapped; - Supplier supplier; + Writeable.Reader reader; DiscoveryNode targetNode; - ProxyRequest(Supplier supplier) { - this.supplier = supplier; + ProxyRequest(Writeable.Reader reader) { + this.reader = reader; } ProxyRequest(T wrapped, DiscoveryNode targetNode) { @@ -113,8 +114,7 @@ public final class TransportActionProxy { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); targetNode = new DiscoveryNode(in); - wrapped = supplier.get(); - wrapped.readFrom(in); + wrapped = reader.read(in); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TransportMessage.java b/core/src/main/java/org/elasticsearch/transport/TransportMessage.java index fa21a51ba2d..ecaca73b2db 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportMessage.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportMessage.java @@ -22,11 +22,12 @@ package org.elasticsearch.transport; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.transport.TransportAddress; import java.io.IOException; -public abstract class TransportMessage implements Streamable { +public abstract class TransportMessage implements Streamable, Writeable { private TransportAddress remoteAddress; diff --git a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java index c42ec24ad15..d6072fc9d0a 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportRequest.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportRequest.java @@ -39,6 +39,10 @@ public abstract class TransportRequest extends TransportMessage implements TaskA public TransportRequest() { } + public TransportRequest(StreamInput in) throws IOException { + parentTaskId = TaskId.readFromStream(in); + } + /** * Set a reference to task that created this request. */ diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 13034355366..a68e319bb2c 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -32,6 +32,8 @@ import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.regex.Regex; @@ -709,7 +711,24 @@ public class TransportService extends AbstractLifecycleComponent { String executor, TransportRequestHandler handler) { handler = interceptor.interceptHandler(action, executor, false, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( - action, requestFactory, taskManager, handler, executor, false, true); + action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true); + registerRequestHandler(reg); + } + + /** + * Registers a new request handler + * + * @param action The action the request handler is associated with + * @param requestReader a callable to be used construct new instances for streaming + * @param executor The executor the request handling will be executed on + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler(String action, String executor, + Writeable.Reader requestReader, + TransportRequestHandler handler) { + handler = interceptor.interceptHandler(action, executor, false, handler); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestReader, taskManager, handler, executor, false, true); registerRequestHandler(reg); } @@ -729,7 +748,28 @@ public class TransportService extends AbstractLifecycleComponent { TransportRequestHandler handler) { handler = interceptor.interceptHandler(action, executor, forceExecution, handler); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( - action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); + action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker); + registerRequestHandler(reg); + } + + /** + * Registers a new request handler + * + * @param action The action the request handler is associated with + * @param requestReader The request class that will be used to construct new instances for streaming + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler(String action, + String executor, boolean forceExecution, + boolean canTripCircuitBreaker, + Writeable.Reader requestReader, + TransportRequestHandler handler) { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker); registerRequestHandler(reg); } diff --git a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java index ecd0256b110..5cd82be8cb0 100644 --- a/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/ingest/SimulatePipelineRequestTests.java @@ -49,8 +49,7 @@ public class SimulatePipelineRequestTests extends ESTestCase { BytesStreamOutput out = new BytesStreamOutput(); request.writeTo(out); StreamInput streamInput = out.bytes().streamInput(); - SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(); - otherRequest.readFrom(streamInput); + SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(streamInput); assertThat(otherRequest.getId(), equalTo(request.getId())); assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose())); @@ -65,8 +64,7 @@ public class SimulatePipelineRequestTests extends ESTestCase { request.writeTo(output); StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes); - SimulatePipelineRequest serialized = new SimulatePipelineRequest(); - serialized.readFrom(in); + SimulatePipelineRequest serialized = new SimulatePipelineRequest(in); assertEquals(XContentType.JSON, serialized.getXContentType()); assertEquals("{}", serialized.getSource().utf8ToString()); } @@ -77,8 +75,7 @@ public class SimulatePipelineRequestTests extends ESTestCase { Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0); try (StreamInput in = StreamInput.wrap(data)) { in.setVersion(version); - SimulatePipelineRequest request = new SimulatePipelineRequest(); - request.readFrom(in); + SimulatePipelineRequest request = new SimulatePipelineRequest(in); assertEquals(XContentType.JSON, request.getXContentType()); assertEquals("{}", request.getSource().utf8ToString()); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java index e73ad8e439c..64f41825509 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportActionProxyTests.java @@ -267,7 +267,7 @@ public class TransportActionProxyTests extends ESTestCase { } public void testIsProxyRequest() { - assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null))); + assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null))); assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 98736a7a98e..503a7ae1f79 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -419,8 +419,7 @@ public final class MockTransportService extends TransportService { RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action); BytesStreamOutput bStream = new BytesStreamOutput(); request.writeTo(bStream); - final TransportRequest clonedRequest = reg.newRequest(); - clonedRequest.readFrom(bStream.bytes().streamInput()); + final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput()); Runnable runnable = new AbstractRunnable() { AtomicBoolean requestSent = new AtomicBoolean();