diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index fe444407afb..9a5f079b3c1 100644 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -195,7 +195,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent implem new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount), new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer")))); } - ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - clientBootstrap.setPipelineFactory(clientPipelineFactory); + clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory()); clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis()); if (tcpNoDelay != null) { clientBootstrap.setOption("tcpNoDelay", tcpNoDelay); @@ -328,28 +308,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")), workerCount)); } - ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("openChannels", openChannels); - SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); - if (maxCumulationBufferCapacity != null) { - if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { - sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); - } else { - sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes()); - } - } - if (maxCompositeBufferComponents != -1) { - sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents); - } - pipeline.addLast("size", sizeHeader); - pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); - return pipeline; - } - }; - serverBootstrap.setPipelineFactory(serverPipelineFactory); + serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory()); if (tcpNoDelay != null) { serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay); } @@ -876,6 +835,70 @@ public class NettyTransport extends AbstractLifecycleComponent implem return nodeChannels.channel(options.type()); } + public ChannelPipelineFactory configureClientChannelPipelineFactory() { + return new ClientChannelPipelineFactory(this); + } + + protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory { + protected NettyTransport nettyTransport; + + public ClientChannelPipelineFactory(NettyTransport nettyTransport) { + this.nettyTransport = nettyTransport; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (nettyTransport.maxCumulationBufferCapacity != null) { + if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes()); + } + } + if (nettyTransport.maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + return channelPipeline; + } + } + + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new ServerChannelPipeFactory(this); + } + + protected static class ServerChannelPipeFactory implements ChannelPipelineFactory { + + protected NettyTransport nettyTransport; + + public ServerChannelPipeFactory(NettyTransport nettyTransport) { + this.nettyTransport = nettyTransport; + } + + @Override + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline channelPipeline = Channels.pipeline(); + channelPipeline.addLast("openChannels", nettyTransport.serverOpenChannels); + SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder(); + if (nettyTransport.maxCumulationBufferCapacity != null) { + if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) { + sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE); + } else { + sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes()); + } + } + if (nettyTransport.maxCompositeBufferComponents != -1) { + sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents); + } + channelPipeline.addLast("size", sizeHeader); + channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger)); + return channelPipeline; + } + } + private class ChannelCloseListener implements ChannelFutureListener { private final DiscoveryNode node; diff --git a/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java new file mode 100644 index 00000000000..440564c60f4 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java @@ -0,0 +1,159 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.NettyTransportChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * + */ +public class ConfigurableErrorNettyTransportModule extends AbstractModule { + + @Override + protected void configure() { + bind(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton(); + + } + + public static final class ExceptionThrowingNettyTransport extends NettyTransport { + + @Inject + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { + super(settings, threadPool, networkService, bigArrays, version); + } + + @Override + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new ErrorPipelineFactory(this); + } + + private static class ErrorPipelineFactory extends ServerChannelPipeFactory { + + private final ESLogger logger; + + public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) { + super(exceptionThrowingNettyTransport); + this.logger = exceptionThrowingNettyTransport.logger; + } + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = super.getPipeline(); + pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { + + @Override + protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + final String action = buffer.readString(); + + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + try { + final TransportRequestHandler handler = transportServiceAdapter.handler(action); + if (handler == null) { + throw new ActionNotFoundTransportException(action); + } + final TransportRequest request = handler.newInstance(); + request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); + request.readFrom(buffer); + if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) { + throw new ElasticsearchException((String) request.getHeaders().get("ERROR")); + } + if (handler.executor() == ThreadPool.Names.SAME) { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); + } + } catch (Throwable e) { + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e); + logger.warn("Actual Exception", e1); + } + } + return action; + } + + class RequestHandler extends AbstractRunnable { + private final TransportRequestHandler handler; + private final TransportRequest request; + private final NettyTransportChannel transportChannel; + private final String action; + + public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { + this.handler = handler; + this.request = request; + this.transportChannel = transportChannel; + this.action = action; + } + + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + try { + handler.messageReceived(request, transportChannel); + } catch (Throwable e) { + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + } + }); + return pipeline; + } + } + } +} diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java new file mode 100644 index 00000000000..ed5dab3fde2 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java @@ -0,0 +1,64 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.transport; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.transport.TransportModule; +import org.junit.Test; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyTransportTests extends ElasticsearchIntegrationTest { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + ImmutableSettings.Builder builder = settingsBuilder() + .put("node.mode", "network") + .put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class); + return builder.put(super.nodeSettings(nodeOrdinal)).build(); + } + + @Test + public void testThatConnectionFailsAsIntended() throws Exception { + Client transportClient = internalCluster().transportClient(); + ClusterHealthResponse clusterIndexHealths = transportClient.admin().cluster().prepareHealth().get(); + assertThat(clusterIndexHealths.getStatus(), is(ClusterHealthStatus.GREEN)); + + try { + transportClient.admin().cluster().prepareHealth().putHeader("ERROR", "MY MESSAGE").get(); + fail("Expected exception, but didnt happen"); + } catch (ElasticsearchException e) { + assertThat(e.getMessage(), containsString("MY MESSAGE")); + } + } +}