diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index ecf2fdf4dbb..a7d3031944d 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -19,6 +19,7 @@ package org.elasticsearch.transport.netty; +import com.google.common.base.Charsets; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -604,6 +605,18 @@ public class NettyTransport extends AbstractLifecycleComponent implem // close the channel as safe measure, which will cause a node to be disconnected if relevant ctx.getChannel().close(); disconnectFromNodeChannel(ctx.getChannel(), e.getCause()); + } else if (e.getCause() instanceof SizeHeaderFrameDecoder.HttpOnTransportException) { + // in case we are able to return data, serialize the exception content and sent it back to the client + if (ctx.getChannel().isOpen()) { + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(e.getCause().getMessage().getBytes(Charsets.UTF_8)); + ChannelFuture channelFuture = ctx.getChannel().write(buffer); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + future.getChannel().close(); + } + }); + } } else { logger.warn("exception caught on transport layer [{}], closing connection", e.getCause(), ctx.getChannel()); // close the channel, which will cause a node to be disconnected if relevant diff --git a/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java b/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java index 85c6b898bde..d3fd096ffb8 100644 --- a/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java +++ b/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java @@ -19,8 +19,11 @@ package org.elasticsearch.transport.netty; +import com.google.common.base.Charsets; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.rest.RestStatus; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; @@ -43,6 +46,19 @@ public class SizeHeaderFrameDecoder extends FrameDecoder { int readerIndex = buffer.readerIndex(); if (buffer.getByte(readerIndex) != 'E' || buffer.getByte(readerIndex + 1) != 'S') { + // special handling for what is probably HTTP + if (bufferStartsWith(buffer, readerIndex, "GET ") || + bufferStartsWith(buffer, readerIndex, "POST ") || + bufferStartsWith(buffer, readerIndex, "PUT ") || + bufferStartsWith(buffer, readerIndex, "HEAD ") || + bufferStartsWith(buffer, readerIndex, "DELETE ") || + bufferStartsWith(buffer, readerIndex, "OPTIONS ") || + bufferStartsWith(buffer, readerIndex, "PATCH ") || + bufferStartsWith(buffer, readerIndex, "TRACE ")) { + + throw new HttpOnTransportException("This is not a HTTP port"); + } + // we have 6 readable bytes, show 4 (should be enough) throw new StreamCorruptedException("invalid internal transport message format, got (" + Integer.toHexString(buffer.getByte(readerIndex) & 0xFF) + "," @@ -67,4 +83,31 @@ public class SizeHeaderFrameDecoder extends FrameDecoder { buffer.skipBytes(6); return buffer; } + + private boolean bufferStartsWith(ChannelBuffer buffer, int readerIndex, String method) { + char[] chars = method.toCharArray(); + for (int i = 0; i < chars.length; i++) { + if (buffer.getByte(readerIndex + i) != chars[i]) { + return false; + } + } + + return true; + } + + /** + * A helper exception to mark an incoming connection as potentially being HTTP + * so an appropriate error code can be returned + */ + public class HttpOnTransportException extends ElasticsearchException { + + public HttpOnTransportException(String msg) { + super(msg); + } + + @Override + public RestStatus status() { + return RestStatus.BAD_REQUEST; + } + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java new file mode 100644 index 00000000000..928cd2e7955 --- /dev/null +++ b/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -0,0 +1,107 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import com.google.common.base.Charsets; +import org.elasticsearch.Version; +import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.node.settings.NodeSettingsService; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.elasticsearch.test.cache.recycler.MockBigArrays; +import org.elasticsearch.test.cache.recycler.MockPageCacheRecycler; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.netty.NettyTransport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.Socket; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.hamcrest.Matchers.is; + +/** + * This test checks, if a HTTP look-alike request (starting with a HTTP method and a space) + * actually returns text response instead of just dropping the connection + */ +public class NettySizeHeaderFrameDecoderTests extends ElasticsearchTestCase { + + private final Settings settings = settingsBuilder().put("name", "foo").put("transport.host", "127.0.0.1").build(); + + private ThreadPool threadPool; + private NettyTransport nettyTransport; + private int port; + private String host; + + @Before + public void startThreadPool() { + threadPool = new ThreadPool(settings, new NodeSettingsService(settings)); + + NetworkService networkService = new NetworkService(settings); + BigArrays bigArrays = new MockBigArrays(settings, new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); + nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT); + nettyTransport.start(); + TransportService transportService = new TransportService(nettyTransport, threadPool); + nettyTransport.transportServiceAdapter(transportService.createAdapter()); + + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) nettyTransport.boundAddress().boundAddress(); + port = transportAddress.address().getPort(); + host = transportAddress.address().getHostString(); + + } + + @After + public void terminateThreadPool() throws InterruptedException { + nettyTransport.stop(); + terminate(threadPool); + } + + @Test + public void testThatTextMessageIsReturnedOnHTTPLikeRequest() throws Exception { + String randomMethod = randomFrom("GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS", "PATCH"); + String data = randomMethod + " / HTTP/1.1"; + + try (Socket socket = new Socket(host, port)) { + socket.getOutputStream().write(data.getBytes(Charsets.UTF_8)); + socket.getOutputStream().flush(); + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), Charsets.UTF_8))) { + assertThat(reader.readLine(), is("This is not a HTTP port")); + } + } + } + + @Test + public void testThatNothingIsReturnedForOtherInvalidPackets() throws Exception { + try (Socket socket = new Socket(host, port)) { + socket.getOutputStream().write("FOOBAR".getBytes(Charsets.UTF_8)); + socket.getOutputStream().flush(); + + // end of stream + assertThat(socket.getInputStream().read(), is(-1)); + } + } +}