improve how decoding is done on the transport layer, embedding FrameDecoder into the message handler, and reducing allocation of buffers and better guess into allocating cumalation buffers

This commit is contained in:
Shay Banon 2011-11-24 20:03:25 +02:00
parent f0efb8cdea
commit 03c2e5ea52
4 changed files with 120 additions and 64 deletions

View File

@ -101,7 +101,7 @@ public class CachedStreamOutput {
private static final SoftWrapper<Queue<Entry>> cache = new SoftWrapper<Queue<Entry>>();
private static final AtomicInteger counter = new AtomicInteger();
public static int BYTES_LIMIT = 10 * 1024 * 1024; // don't cache entries that are bigger than that...
public static int BYTES_LIMIT = 1 * 1024 * 1024; // don't cache entries that are bigger than that...
public static int COUNT_LIMIT = 100;
public static void clear() {

View File

@ -26,7 +26,10 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.buffer.ChannelBuffers;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.channel.ChannelStateEvent;
import org.elasticsearch.common.netty.channel.ExceptionEvent;
import org.elasticsearch.common.netty.channel.MessageEvent;
import org.elasticsearch.common.netty.channel.SimpleChannelUpstreamHandler;
@ -42,9 +45,12 @@ import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.support.TransportStreams;
import java.io.IOException;
import java.io.StreamCorruptedException;
import java.net.SocketAddress;
/**
* @author kimchy (shay.banon)
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
* to the relevant action.
*/
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
@ -56,6 +62,9 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
private final NettyTransport transport;
// from FrameDecoder
private ChannelBuffer cumulation;
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
this.threadPool = transport.threadPool();
this.transportServiceAdapter = transport.transportServiceAdapter();
@ -68,11 +77,114 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
super.writeComplete(ctx, e);
}
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
// similar logic to FrameDecoder, we don't use FrameDecoder because we can use the data len header value
// to guess the size of the cumulation buffer to allocate
// Also strange, is that the FrameDecoder always allocated a cumulation, even if the input bufer is enough
// so we don't allocate a cumulation buffer unless we really need to here (need to post this to the mailing list)
@Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
int size = buffer.getInt(buffer.readerIndex() - 4);
Object m = e.getMessage();
if (!(m instanceof ChannelBuffer)) {
ctx.sendUpstream(e);
return;
}
ChannelBuffer input = (ChannelBuffer) m;
if (!input.readable()) {
return;
}
ChannelBuffer cumulation = this.cumulation;
if (cumulation != null && cumulation.readable()) {
cumulation.discardReadBytes();
cumulation.writeBytes(input);
callDecode(ctx, e.getChannel(), cumulation, e.getRemoteAddress());
} else {
int actualSize = callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
if (input.readable()) {
if (actualSize > 0) {
cumulation = ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory());
} else {
cumulation = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
}
cumulation.writeBytes(input);
this.cumulation = cumulation;
}
}
}
@Override
public void channelDisconnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
@Override
public void channelClosed(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
cleanup(ctx, e);
}
private int callDecode(ChannelHandlerContext context, Channel channel, ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {
while (cumulation.readable()) {
// Changes from Frame Decoder, to combine SizeHeader and this decoder into one...
if (cumulation.readableBytes() < 4) {
break; // we need more data
}
int dataLen = cumulation.getInt(cumulation.readerIndex());
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
int actualSize = dataLen + 4;
if (cumulation.readableBytes() < actualSize) {
return actualSize;
}
cumulation.skipBytes(4);
process(context, channel, cumulation, dataLen);
}
// TODO: we can potentially create a cumulation buffer cache, pop/push style
if (!cumulation.readable()) {
this.cumulation = null;
}
return 0;
}
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
try {
ChannelBuffer cumulation = this.cumulation;
if (cumulation == null) {
return;
} else {
this.cumulation = null;
}
if (cumulation.readable()) {
// Make sure all frames are read before notifying a closed channel.
callDecode(ctx, ctx.getChannel(), cumulation, null);
}
// Call decodeLast() finally. Please note that decodeLast() is
// called even if there's nothing more to read from the buffer to
// notify a user that the connection was closed explicitly.
// Change from FrameDecoder: we don't need it...
// Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
// if (partialFrame != null) {
// unfoldAndFireMessageReceived(ctx, null, partialFrame);
// }
} finally {
ctx.sendUpstream(e);
}
}
private void process(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int size) throws Exception {
transportServiceAdapter.received(size + 4);
int markedReaderIndex = buffer.readerIndex();
@ -92,7 +204,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
if (isRequest) {
String action = handleRequest(event, handlesStream, requestId);
String action = handleRequest(channel, handlesStream, requestId);
if (buffer.readerIndex() != expectedIndexReader) {
if (buffer.readerIndex() < expectedIndexReader) {
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
@ -177,10 +289,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private String handleRequest(MessageEvent event, StreamInput buffer, long requestId) throws IOException {
private String handleRequest(Channel channel, StreamInput buffer, long requestId) throws IOException {
final String action = buffer.readUTF();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, event.getChannel(), requestId);
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {

View File

@ -218,7 +218,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new SizeHeaderFrameDecoder());
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}
@ -261,7 +260,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
@Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", serverOpenChannels);
pipeline.addLast("decoder", new SizeHeaderFrameDecoder());
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}

View File

@ -1,54 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.netty.buffer.ChannelBuffer;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelHandlerContext;
import org.elasticsearch.common.netty.handler.codec.frame.FrameDecoder;
import java.io.StreamCorruptedException;
/**
* @author kimchy (shay.banon)
*/
public class SizeHeaderFrameDecoder extends FrameDecoder {
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;
}
int dataLen = buffer.getInt(buffer.readerIndex());
if (dataLen <= 0) {
throw new StreamCorruptedException("invalid data length: " + dataLen);
}
if (buffer.readableBytes() < dataLen + 4) {
return null;
}
buffer.skipBytes(4);
return buffer;
}
}