From 1816951b6b0320e7a011436c7c7519ec2bfabc6e Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Fri, 18 Jul 2014 15:38:57 +0200 Subject: [PATCH] Netty: Refactoring to make MessageChannelHandler extensible Small refactorings to make the MessageChannelHandler more extensible. Also allowed access to the different netty pipelines This is the fix after the first version had problems with the HTTP transport due to wrong reusing channel handlers, which is the reason why tests failed. Relates #6889 Closes #6915 --- .../http/netty/NettyHttpServerTransport.java | 13 +- .../netty/MessageChannelHandler.java | 12 +- .../transport/netty/NettyTransport.java | 109 +++++++----- ...ConfigurableErrorNettyTransportModule.java | 159 ++++++++++++++++++ .../test/transport/NettyTransportTests.java | 64 +++++++ 5 files changed, 303 insertions(+), 54 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java create mode 100644 src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index fe444407afb..9a5f079b3c1 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -195,7 +195,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount), new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer")))); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - clientBootstrap.setPipelineFactory(clientPipelineFactory); + clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory()); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { clientBootstrap.setOption("tcpNoDelay", tcpNoDelay); @@ -328,28 +308,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", openChannels); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - serverBootstrap.setPipelineFactory(serverPipelineFactory); + serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory()); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); } @@ -876,6 +835,70 @@ public class NettyTransport extends AbstractLifecycleComponent implem return nodeChannels.channel(options.type()); } + public ChannelPipelineFactory configureClientChannelPipelineFactory() { + return new ClientChannelPipelineFactory(this); + } + + protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory { + protected NettyTransport nettyTransport; + + public ClientChannelPipelineFactory(NettyTransport nettyTransport) { + this.nettyTransport = nettyTransport; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (nettyTransport.maxCumulationBufferCapacity != null) { + if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes()); + } + } + if (nettyTransport.maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + return channelPipeline; + } + } + + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new ServerChannelPipeFactory(this); + } + + protected static class ServerChannelPipeFactory implements ChannelPipelineFactory { + + protected NettyTransport nettyTransport; + + public ServerChannelPipeFactory(NettyTransport nettyTransport) { + this.nettyTransport = nettyTransport; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + channelPipeline.addLast("openChannels", nettyTransport.serverOpenChannels); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (nettyTransport.maxCumulationBufferCapacity != null) { + if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes()); + } + } + if (nettyTransport.maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + return channelPipeline; + } + } + private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; diff --git a/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java new file mode 100644 index 00000000000..440564c60f4 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.NettyTransportChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * + */ +public class ConfigurableErrorNettyTransportModule extends AbstractModule { + + @Override + protected void configure() { + bind(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + + } + + public static final class ExceptionThrowingNettyTransport extends NettyTransport { + + @Inject + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { + super(settings, threadPool, networkService, bigArrays, version); + } + + @Override + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new ErrorPipelineFactory(this); + } + + private static class ErrorPipelineFactory extends ServerChannelPipeFactory { + + private final ESLogger logger; + + public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) { + super(exceptionThrowingNettyTransport); + this.logger = exceptionThrowingNettyTransport.logger; + } + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = super.getPipeline(); + pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { + + @Override + protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + final String action = buffer.readString(); + + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + try { + final TransportRequestHandler handler = transportServiceAdapter.handler(action); + if (handler == null) { + throw new ActionNotFoundTransportException(action); + } + final TransportRequest request = handler.newInstance(); + request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); + request.readFrom(buffer); + if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) { + throw new ElasticsearchException((String) request.getHeaders().get("ERROR")); + } + if (handler.executor() == ThreadPool.Names.SAME) { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); + } + } catch (Throwable e) { + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e); + logger.warn("Actual Exception", e1); + } + } + return action; + } + + class RequestHandler extends AbstractRunnable { + private final TransportRequestHandler handler; + private final TransportRequest request; + private final NettyTransportChannel transportChannel; + private final String action; + + public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { + this.handler = handler; + this.request = request; + this.transportChannel = transportChannel; + this.action = action; + } + + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + try { + handler.messageReceived(request, transportChannel); + } catch (Throwable e) { + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + } + }); + return pipeline; + } + } + } +} diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java new file mode 100644 index 00000000000..ed5dab3fde2 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.transport.TransportModule; +import org.junit.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyTransportTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ImmutableSettings.Builder builder = settingsBuilder() + .put("node.mode", "network") + .put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class); + return builder.put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Test + public void testThatConnectionFailsAsIntended() throws Exception { + Client transportClient = internalCluster().transportClient(); + ClusterHealthResponse clusterIndexHealths = transportClient.admin().cluster().prepareHealth().get(); + assertThat(clusterIndexHealths.getStatus(), is(ClusterHealthStatus.GREEN)); + + try { + transportClient.admin().cluster().prepareHealth().putHeader("ERROR", "MY MESSAGE").get(); + fail("Expected exception, but didnt happen"); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("MY MESSAGE")); + } + } +}