From db7f0d36af6692b1648cbea6e1e7147e7cfa70dc Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Thu, 17 Jul 2014 08:28:55 +0200 Subject: [PATCH] Netty: Refactoring to make MessageChannelHandler extensible Small refactorings to make the MessageChannelHandler more extensible. Also allowed access to the different netty pipelines Closes #6889 --- .../http/netty/NettyHttpServerTransport.java | 78 ++++------ .../netty/MessageChannelHandler.java | 12 +- .../transport/netty/NettyTransport.java | 92 ++++++------ ...ConfigurableErrorNettyTransportModule.java | 142 ++++++++++++++++++ .../test/transport/NettyTransportTests.java | 64 ++++++++ 5 files changed, 287 insertions(+), 101 deletions(-) create mode 100644 src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java create mode 100644 src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index 94c97061cdd..530fa7aa5cf 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -195,8 +195,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent Integer.MAX_VALUE) { + requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + requestDecoder.setMaxCumulationBufferCapacity((int) this.maxCumulationBufferCapacity.bytes()); + } } - - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", transport.serverOpenChannels); - HttpRequestDecoder requestDecoder = new HttpRequestDecoder( - (int) transport.maxInitialLineLength.bytes(), - (int) transport.maxHeaderSize.bytes(), - (int) transport.maxChunkSize.bytes() - ); - if (transport.maxCumulationBufferCapacity != null) { - if (transport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.bytes()); - } - } - if (transport.maxCompositeBufferComponents != -1) { - requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); - } - pipeline.addLast("decoder", requestDecoder); - if (transport.compression) { - pipeline.addLast("decoder_compress", new HttpContentDecompressor()); - } - HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.bytes()); - if (transport.maxCompositeBufferComponents != -1) { - httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); - } - pipeline.addLast("aggregator", httpChunkAggregator); - pipeline.addLast("encoder", new HttpResponseEncoder()); - if (transport.compression) { - pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel)); - } - pipeline.addLast("handler", requestHandler); - return pipeline; + if (this.maxCompositeBufferComponents != -1) { + requestDecoder.setMaxCumulationBufferComponents(this.maxCompositeBufferComponents); } + channelPipeline.addLast("decoder", requestDecoder); + if (this.compression) { + channelPipeline.addLast("decoder_compress", new HttpContentDecompressor()); + } + HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) this.maxContentLength.bytes()); + if (this.maxCompositeBufferComponents != -1) { + httpChunkAggregator.setMaxCumulationBufferComponents(this.maxCompositeBufferComponents); + } + channelPipeline.addLast("aggregator", httpChunkAggregator); + channelPipeline.addLast("encoder", new HttpResponseEncoder()); + if (this.compression) { + channelPipeline.addLast("encoder_compress", new HttpContentCompressor(this.compressionLevel)); + } + channelPipeline.addLast("handler", new HttpRequestHandler(this)); } } diff --git a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 74ad0b37ac8..6d896c39b77 100644 --- a/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -45,10 +45,10 @@ import java.net.InetSocketAddress; */ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { - private final ESLogger logger; - private final ThreadPool threadPool; - private final TransportServiceAdapter transportServiceAdapter; - private final NettyTransport transport; + protected final ESLogger logger; + protected final ThreadPool threadPool; + protected final TransportServiceAdapter transportServiceAdapter; + protected final NettyTransport transport; public MessageChannelHandler(NettyTransport transport, ESLogger logger) { this.threadPool = transport.threadPool(); @@ -142,7 +142,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { wrappedStream.close(); } - private void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) { + protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) { final TransportResponse response = handler.newInstance(); response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); response.remoteAddress(); @@ -200,7 +200,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { final String action = buffer.readString(); final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 19757988883..849c82ffb2c 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -72,10 +72,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.channels.CancelledKeyException; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; @@ -272,27 +269,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount), new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer")))); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - clientBootstrap.setPipelineFactory(clientPipelineFactory); + ChannelPipeline clientChannelPipeline = Channels.pipeline(); + configureClientChannelPipeline(clientChannelPipeline); + clientBootstrap.setPipeline(clientChannelPipeline); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { clientBootstrap.setOption("tcpNoDelay", tcpNoDelay); @@ -315,8 +294,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem return; } - final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger); - this.serverOpenChannels = openChannels; if (blockingServer) { serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory( Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")), @@ -328,28 +305,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", openChannels); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - serverBootstrap.setPipelineFactory(serverPipelineFactory); + + ChannelPipeline serverChannelPipeline = Channels.pipeline(); + configureServerChannelPipeline(serverChannelPipeline); + serverBootstrap.setPipeline(serverChannelPipeline); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); } @@ -412,6 +371,41 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress)); } + public void configureClientChannelPipeline(ChannelPipeline channelPipeline) { + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } + } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(this, logger)); + } + + public void configureServerChannelPipeline(ChannelPipeline channelPipeline) { + final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger); + this.serverOpenChannels = openChannels; + channelPipeline.addLast("openChannels", openChannels); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (maxCumulationBufferCapacity != null) { + if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); + } + } + if (maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(this, logger)); + } + @Override protected void doStop() throws ElasticsearchException { final CountDownLatch latch = new CountDownLatch(1); diff --git a/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java new file mode 100644 index 00000000000..2e6d1c04054 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java @@ -0,0 +1,142 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.NettyTransportChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * + */ +public class ConfigurableErrorNettyTransportModule extends AbstractModule { + + @Override + protected void configure() { + bind(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + + } + + public static final class ExceptionThrowingNettyTransport extends NettyTransport { + + @Inject + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { + super(settings, threadPool, networkService, bigArrays, version); + } + + @Override + public void configureServerChannelPipeline(ChannelPipeline channelPipeline) { + super.configureServerChannelPipeline(channelPipeline); + channelPipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(this, logger) { + + @Override + protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + final String action = buffer.readString(); + + final NettyTransportChannel transportChannel = new NettyTransportChannel(ExceptionThrowingNettyTransport.this, action, channel, requestId, version); + try { + final TransportRequestHandler handler = transportServiceAdapter.handler(action); + if (handler == null) { + throw new ActionNotFoundTransportException(action); + } + final TransportRequest request = handler.newInstance(); + request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); + request.readFrom(buffer); + if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) { + throw new ElasticsearchException((String)request.getHeaders().get("ERROR")); + } + if (handler.executor() == ThreadPool.Names.SAME) { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); + } + } catch (Throwable e) { + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e); + logger.warn("Actual Exception", e1); + } + } + return action; + } + + class RequestHandler extends AbstractRunnable { + private final TransportRequestHandler handler; + private final TransportRequest request; + private final NettyTransportChannel transportChannel; + private final String action; + + public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { + this.handler = handler; + this.request = request; + this.transportChannel = transportChannel; + this.action = action; + } + + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + try { + handler.messageReceived(request, transportChannel); + } catch (Throwable e) { + if (ExceptionThrowingNettyTransport.this.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + } + }); + } + } +} diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java new file mode 100644 index 00000000000..ed5dab3fde2 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.transport.TransportModule; +import org.junit.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyTransportTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ImmutableSettings.Builder builder = settingsBuilder() + .put("node.mode", "network") + .put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class); + return builder.put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Test + public void testThatConnectionFailsAsIntended() throws Exception { + Client transportClient = internalCluster().transportClient(); + ClusterHealthResponse clusterIndexHealths = transportClient.admin().cluster().prepareHealth().get(); + assertThat(clusterIndexHealths.getStatus(), is(ClusterHealthStatus.GREEN)); + + try { + transportClient.admin().cluster().prepareHealth().putHeader("ERROR", "MY MESSAGE").get(); + fail("Expected exception, but didnt happen"); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("MY MESSAGE")); + } + } +}