From 03c2e5ea528abf0029808c63f5ba6d5d803b1757 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 24 Nov 2011 20:03:25 +0200 Subject: [PATCH] improve how decoding is done on the transport layer, embedding FrameDecoder into the message handler, and reducing allocation of buffers and better guess into allocating cumalation buffers --- .../common/io/stream/CachedStreamOutput.java | 2 +- .../netty/MessageChannelHandler.java | 126 +++++++++++++++++- .../transport/netty/NettyTransport.java | 2 - .../netty/SizeHeaderFrameDecoder.java | 54 -------- 4 files changed, 120 insertions(+), 64 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java index 9717c7a8cd9..f94fae3a52d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/io/stream/CachedStreamOutput.java @@ -101,7 +101,7 @@ public class CachedStreamOutput { private static final SoftWrapper> cache = new SoftWrapper>(); private static final AtomicInteger counter = new AtomicInteger(); - public static int BYTES_LIMIT = 10 * 1024 * 1024; // don't cache entries that are bigger than that... + public static int BYTES_LIMIT = 1 * 1024 * 1024; // don't cache entries that are bigger than that... public static int COUNT_LIMIT = 100; public static void clear() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java index 356939a6c5a..92a80322aba 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java @@ -26,7 +26,10 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.netty.buffer.ChannelBuffer; +import org.elasticsearch.common.netty.buffer.ChannelBuffers; +import org.elasticsearch.common.netty.channel.Channel; import org.elasticsearch.common.netty.channel.ChannelHandlerContext; +import org.elasticsearch.common.netty.channel.ChannelStateEvent; import org.elasticsearch.common.netty.channel.ExceptionEvent; import org.elasticsearch.common.netty.channel.MessageEvent; import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler; @@ -42,9 +45,12 @@ import org.elasticsearch.transport.TransportServiceAdapter; import org.elasticsearch.transport.support.TransportStreams; import java.io.IOException; +import java.io.StreamCorruptedException; +import java.net.SocketAddress; /** - * @author kimchy (shay.banon) + * A handler (must be the last one!) that does size based frame decoding and forwards the actual message + * to the relevant action. */ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { @@ -56,6 +62,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { private final NettyTransport transport; + // from FrameDecoder + private ChannelBuffer cumulation; + public MessageChannelHandler(NettyTransport transport, ESLogger logger) { this.threadPool = transport.threadPool(); this.transportServiceAdapter = transport.transportServiceAdapter(); @@ -68,11 +77,114 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { super.writeComplete(ctx, e); } - @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception { - ChannelBuffer buffer = (ChannelBuffer) event.getMessage(); + // similar logic to FrameDecoder, we don't use FrameDecoder because we can use the data len header value + // to guess the size of the cumulation buffer to allocate + // Also strange, is that the FrameDecoder always allocated a cumulation, even if the input bufer is enough + // so we don't allocate a cumulation buffer unless we really need to here (need to post this to the mailing list) + @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - int size = buffer.getInt(buffer.readerIndex() - 4); + Object m = e.getMessage(); + if (!(m instanceof ChannelBuffer)) { + ctx.sendUpstream(e); + return; + } + ChannelBuffer input = (ChannelBuffer) m; + if (!input.readable()) { + return; + } + + ChannelBuffer cumulation = this.cumulation; + if (cumulation != null && cumulation.readable()) { + cumulation.discardReadBytes(); + cumulation.writeBytes(input); + callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress()); + } else { + int actualSize = callDecode(ctx, e.getChannel(), input, e.getRemoteAddress()); + if (input.readable()) { + if (actualSize > 0) { + cumulation = ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory()); + } else { + cumulation = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory()); + } + cumulation.writeBytes(input); + this.cumulation = cumulation; + } + } + } + + @Override + public void channelDisconnected( + ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + cleanup(ctx, e); + } + + @Override + public void channelClosed( + ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + cleanup(ctx, e); + } + + private int callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception { + while (cumulation.readable()) { + // Changes from Frame Decoder, to combine SizeHeader and this decoder into one... + if (cumulation.readableBytes() < 4) { + break; // we need more data + } + + int dataLen = cumulation.getInt(cumulation.readerIndex()); + if (dataLen <= 0) { + throw new StreamCorruptedException("invalid data length: " + dataLen); + } + + int actualSize = dataLen + 4; + if (cumulation.readableBytes() < actualSize) { + return actualSize; + } + + cumulation.skipBytes(4); + + process(context, channel, cumulation, dataLen); + } + + // TODO: we can potentially create a cumulation buffer cache, pop/push style + if (!cumulation.readable()) { + this.cumulation = null; + } + + return 0; + } + + + private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + try { + ChannelBuffer cumulation = this.cumulation; + if (cumulation == null) { + return; + } else { + this.cumulation = null; + } + + if (cumulation.readable()) { + // Make sure all frames are read before notifying a closed channel. + callDecode(ctx, ctx.getChannel(), cumulation, null); + } + + // Call decodeLast() finally. Please note that decodeLast() is + // called even if there's nothing more to read from the buffer to + // notify a user that the connection was closed explicitly. + + // Change from FrameDecoder: we don't need it... +// Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation); +// if (partialFrame != null) { +// unfoldAndFireMessageReceived(ctx, null, partialFrame); +// } + } finally { + ctx.sendUpstream(e); + } + } + + private void process(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int size) throws Exception { transportServiceAdapter.received(size + 4); int markedReaderIndex = buffer.readerIndex(); @@ -92,7 +204,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } if (isRequest) { - String action = handleRequest(event, handlesStream, requestId); + String action = handleRequest(channel, handlesStream, requestId); if (buffer.readerIndex() != expectedIndexReader) { if (buffer.readerIndex() < expectedIndexReader) { logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action); @@ -177,10 +289,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler { } } - private String handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException { + private String handleRequest(Channel channel, StreamInput buffer, long requestId) throws IOException { final String action = buffer.readUTF(); - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId); + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId); try { final TransportRequestHandler handler = transportServiceAdapter.handler(action); if (handler == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 4d1ed86b64b..7a8e9bbc9b9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -218,7 +218,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); - pipeline.addLast("decoder", new SizeHeaderFrameDecoder()); pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); return pipeline; } @@ -261,7 +260,6 @@ public class NettyTransport extends AbstractLifecycleComponent implem @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("openChannels", serverOpenChannels); - pipeline.addLast("decoder", new SizeHeaderFrameDecoder()); pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger)); return pipeline; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java deleted file mode 100644 index 9073cb00394..00000000000 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/SizeHeaderFrameDecoder.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to Elastic Search and Shay Banon under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Elastic Search licenses this - * file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.netty; - -import org.elasticsearch.common.netty.buffer.ChannelBuffer; -import org.elasticsearch.common.netty.channel.Channel; -import org.elasticsearch.common.netty.channel.ChannelHandlerContext; -import org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder; - -import java.io.StreamCorruptedException; - -/** - * @author kimchy (shay.banon) - */ -public class SizeHeaderFrameDecoder extends FrameDecoder { - - protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { - - if (buffer.readableBytes() < 4) { - return null; - } - - int dataLen = buffer.getInt(buffer.readerIndex()); - if (dataLen <= 0) { - throw new StreamCorruptedException("invalid data length: " + dataLen); - } - - if (buffer.readableBytes() < dataLen + 4) { - return null; - } - - buffer.skipBytes(4); - - return buffer; - } - -}