revamp TransportRequest handlers to support Writeable (#26315)

This PR begins the long journey to deprecating Streamable.

The idea here is to add additional method signatures that
support Writeable.Reader, so that the work to migrate objects TransportMessage to
implement Writeable and not Streamable.

One example conversion is done in this PR: SimulatePipelineRequest.
This commit is contained in:
Tal Levy 2017-08-22 15:47:05 -07:00 committed by GitHub
parent 4756c9a884
commit 6ab4b6b0ac
14 changed files with 109 additions and 35 deletions

View File

@ -34,6 +34,10 @@ public abstract class ActionRequest extends TransportRequest {
// this.listenerThreaded = request.listenerThreaded();
}
public ActionRequest(StreamInput in) throws IOException {
super(in);
}
public abstract ActionRequestValidationException validate();
/**

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.ingest.ConfigurationUtils;
@ -68,6 +69,18 @@ public class SimulatePipelineRequest extends ActionRequest {
SimulatePipelineRequest() {
}
SimulatePipelineRequest(StreamInput in) throws IOException {
super(in);
id = in.readOptionalString();
verbose = in.readBoolean();
source = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(source);
}
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -99,15 +112,7 @@ public class SimulatePipelineRequest extends ActionRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readOptionalString();
verbose = in.readBoolean();
source = in.readBytesReference();
if (in.getVersion().onOrAfter(Version.V_5_3_0)) {
xContentType = XContentType.readFrom(in);
} else {
xContentType = XContentFactory.xContentType(source);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -40,7 +40,7 @@ public class SimulatePipelineTransportAction extends HandledTransportAction<Simu
@Inject
public SimulatePipelineTransportAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeService nodeService) {
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SimulatePipelineRequest::new);
super(settings, SimulatePipelineAction.NAME, threadPool, transportService, actionFilters, SimulatePipelineRequest::new, indexNameExpressionResolver);
this.pipelineStore = nodeService.getIngestService().getPipelineStore();
this.executionService = new SimulateExecutionService(threadPool);
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -43,6 +44,12 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
}
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, Writeable.Reader<Request> requestReader,
IndexNameExpressionResolver indexNameExpressionResolver) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, requestReader);
}
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
@ -51,6 +58,14 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
new TransportHandler());
}
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Writeable.Reader<Request> requestReader) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, false, canTripCircuitBreaker, requestReader,
new TransportHandler());
}
class TransportHandler implements TransportRequestHandler<Request> {
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.io.stream;
import java.io.IOException;
import java.util.function.Supplier;
/**
* Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown
@ -43,4 +44,12 @@ public interface Streamable {
* Write this object's fields to a {@linkplain StreamOutput}.
*/
void writeTo(StreamOutput out) throws IOException;
static <T extends Streamable> Writeable.Reader<T> newWriteableReader(Supplier<T> supplier) {
return (StreamInput in) -> {
T request = supplier.get();
request.readFrom(in);
return request;
};
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
@ -32,15 +34,14 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
private final boolean forceExecution;
private final boolean canTripCircuitBreaker;
private final String executor;
private final Supplier<Request> requestFactory;
private final TaskManager taskManager;
private final Writeable.Reader<Request> requestReader;
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
public RequestHandlerRegistry(String action, Writeable.Reader<Request> requestReader, TaskManager taskManager,
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
boolean canTripCircuitBreaker) {
this.action = action;
this.requestFactory = requestFactory;
assert newRequest() != null;
this.requestReader = requestReader;
this.handler = handler;
this.forceExecution = forceExecution;
this.canTripCircuitBreaker = canTripCircuitBreaker;
@ -52,8 +53,8 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
return action;
}
public Request newRequest() {
return requestFactory.get();
public Request newRequest(StreamInput in) throws IOException {
return requestReader.read(in);
}
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {

View File

@ -1475,9 +1475,8 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
}
transportChannel = new TcpTransportChannel<>(this, channel, transportName, action, requestId, version, profileName,
messageLengthBytes);
final TransportRequest request = reg.newRequest();
final TransportRequest request = reg.newRequest(stream);
request.remoteAddress(new TransportAddress(remoteAddress));
request.readFrom(stream);
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
validateRequest(stream, requestId, action);
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -97,11 +98,11 @@ public final class TransportActionProxy {
static class ProxyRequest<T extends TransportRequest> extends TransportRequest {
T wrapped;
Supplier<T> supplier;
Writeable.Reader<T> reader;
DiscoveryNode targetNode;
ProxyRequest(Supplier<T> supplier) {
this.supplier = supplier;
ProxyRequest(Writeable.Reader<T> reader) {
this.reader = reader;
}
ProxyRequest(T wrapped, DiscoveryNode targetNode) {
@ -113,8 +114,7 @@ public final class TransportActionProxy {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
targetNode = new DiscoveryNode(in);
wrapped = supplier.get();
wrapped.readFrom(in);
wrapped = reader.read(in);
}
@Override

View File

@ -22,11 +22,12 @@ package org.elasticsearch.transport;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.transport.TransportAddress;
import java.io.IOException;
public abstract class TransportMessage implements Streamable {
public abstract class TransportMessage implements Streamable, Writeable {
private TransportAddress remoteAddress;

View File

@ -39,6 +39,10 @@ public abstract class TransportRequest extends TransportMessage implements TaskA
public TransportRequest() {
}
public TransportRequest(StreamInput in) throws IOException {
parentTaskId = TaskId.readFromStream(in);
}
/**
* Set a reference to task that created this request.
*/

View File

@ -32,6 +32,8 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
@ -709,7 +711,24 @@ public class TransportService extends AbstractLifecycleComponent {
String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
action, Streamable.newWriteableReader(requestFactory), taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader a callable to be used construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, String executor,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestReader, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}
@ -729,7 +748,28 @@ public class TransportService extends AbstractLifecycleComponent {
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
action, Streamable.newWriteableReader(request), taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param requestReader The request class that will be used to construct new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action,
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
Writeable.Reader<Request> requestReader,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestReader, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}

View File

@ -49,8 +49,7 @@ public class SimulatePipelineRequestTests extends ESTestCase {
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out);
StreamInput streamInput = out.bytes().streamInput();
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest();
otherRequest.readFrom(streamInput);
SimulatePipelineRequest otherRequest = new SimulatePipelineRequest(streamInput);
assertThat(otherRequest.getId(), equalTo(request.getId()));
assertThat(otherRequest.isVerbose(), equalTo(request.isVerbose()));
@ -65,8 +64,7 @@ public class SimulatePipelineRequestTests extends ESTestCase {
request.writeTo(output);
StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes);
SimulatePipelineRequest serialized = new SimulatePipelineRequest();
serialized.readFrom(in);
SimulatePipelineRequest serialized = new SimulatePipelineRequest(in);
assertEquals(XContentType.JSON, serialized.getXContentType());
assertEquals("{}", serialized.getSource().utf8ToString());
}
@ -77,8 +75,7 @@ public class SimulatePipelineRequestTests extends ESTestCase {
Version.V_5_1_1, Version.V_5_1_2, Version.V_5_2_0);
try (StreamInput in = StreamInput.wrap(data)) {
in.setVersion(version);
SimulatePipelineRequest request = new SimulatePipelineRequest();
request.readFrom(in);
SimulatePipelineRequest request = new SimulatePipelineRequest(in);
assertEquals(XContentType.JSON, request.getXContentType());
assertEquals("{}", request.getSource().utf8ToString());

View File

@ -267,7 +267,7 @@ public class TransportActionProxyTests extends ESTestCase {
}
public void testIsProxyRequest() {
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>(() -> null)));
assertTrue(TransportActionProxy.isProxyRequest(new TransportActionProxy.ProxyRequest<>((in) -> null)));
assertFalse(TransportActionProxy.isProxyRequest(TransportRequest.Empty.INSTANCE));
}
}

View File

@ -419,8 +419,7 @@ public final class MockTransportService extends TransportService {
RequestHandlerRegistry reg = MockTransportService.this.getRequestHandler(action);
BytesStreamOutput bStream = new BytesStreamOutput();
request.writeTo(bStream);
final TransportRequest clonedRequest = reg.newRequest();
clonedRequest.readFrom(bStream.bytes().streamInput());
final TransportRequest clonedRequest = reg.newRequest(bStream.bytes().streamInput());
Runnable runnable = new AbstractRunnable() {
AtomicBoolean requestSent = new AtomicBoolean();