mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
use netty frame decoder again
with the new bytes reference abstraction, and the new composite buffer handling in frame decoder, it makes more sense to move back and use netty frame decoder again, since we can easily slice refernces acorss composite buffers, and reduce copies when using compression
This commit is contained in:
parent
f7b538e17f
commit
12beff9176
@ -91,7 +91,6 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
||||
private final Boolean reuseAddress;
|
||||
|
||||
private final ByteSizeValue tcpSendBufferSize;
|
||||
|
||||
private final ByteSizeValue tcpReceiveBufferSize;
|
||||
|
||||
final ByteSizeValue maxCumulationBufferCapacity;
|
||||
@ -173,10 +172,10 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
||||
if (tcpKeepAlive != null) {
|
||||
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
|
||||
}
|
||||
if (tcpSendBufferSize != null) {
|
||||
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
|
||||
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
|
||||
}
|
||||
if (tcpReceiveBufferSize != null) {
|
||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||
}
|
||||
if (reuseAddress != null) {
|
||||
|
@ -28,18 +28,13 @@ import org.elasticsearch.common.io.stream.CachedStreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.*;
|
||||
import org.elasticsearch.transport.support.TransportStreams;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.buffer.ChannelBuffers;
|
||||
import org.jboss.netty.channel.*;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
* A handler (must be the last one!) that does size based frame decoding and forwards the actual message
|
||||
@ -55,11 +50,6 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
|
||||
private final NettyTransport transport;
|
||||
|
||||
// from FrameDecoder
|
||||
private ChannelBuffer cumulation;
|
||||
|
||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().mem().heapMax().bytes() * 0.9);
|
||||
|
||||
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
|
||||
this.threadPool = transport.threadPool();
|
||||
this.transportServiceAdapter = transport.transportServiceAdapter();
|
||||
@ -73,138 +63,16 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
super.writeComplete(ctx, e);
|
||||
}
|
||||
|
||||
// similar logic to FrameDecoder, we don't use FrameDecoder because we can use the data len header value
|
||||
// to guess the size of the cumulation buffer to allocate, and because we make a fresh copy of the cumulation
|
||||
// buffer so we can readBytesReference from it without other request writing into the same one in case
|
||||
// two one message and a partial next message exists within the same input
|
||||
|
||||
// we can readBytesReference because NioWorker always copies the input buffer into a fresh buffer, and we
|
||||
// don't reuse cumumlation buffer
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
|
||||
|
||||
Object m = e.getMessage();
|
||||
if (!(m instanceof ChannelBuffer)) {
|
||||
ctx.sendUpstream(e);
|
||||
return;
|
||||
}
|
||||
ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
int size = buffer.getInt(buffer.readerIndex() - 4);
|
||||
|
||||
ChannelBuffer input = (ChannelBuffer) m;
|
||||
if (!input.readable()) {
|
||||
return;
|
||||
}
|
||||
|
||||
ChannelBuffer cumulation = this.cumulation;
|
||||
if (cumulation != null && cumulation.readable()) {
|
||||
cumulation.discardReadBytes();
|
||||
cumulation.writeBytes(input);
|
||||
callDecode(ctx, e.getChannel(), cumulation, true);
|
||||
} else {
|
||||
int actualSize = callDecode(ctx, e.getChannel(), input, false);
|
||||
if (input.readable()) {
|
||||
if (actualSize > 0) {
|
||||
cumulation = ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory());
|
||||
} else {
|
||||
cumulation = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
|
||||
}
|
||||
cumulation.writeBytes(input);
|
||||
this.cumulation = cumulation;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelDisconnected(
|
||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||
cleanup(ctx, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelClosed(
|
||||
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||
cleanup(ctx, e);
|
||||
}
|
||||
|
||||
private int callDecode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, boolean cumulationBuffer) throws Exception {
|
||||
int actualSize = 0;
|
||||
while (buffer.readable()) {
|
||||
actualSize = 0;
|
||||
// Changes from Frame Decoder, to combine SizeHeader and this decoder into one...
|
||||
if (buffer.readableBytes() < 4) {
|
||||
break; // we need more data
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex());
|
||||
if (dataLen <= 0) {
|
||||
throw new StreamCorruptedException("invalid data length: " + dataLen);
|
||||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException(
|
||||
"transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
actualSize = dataLen + 4;
|
||||
if (buffer.readableBytes() < actualSize) {
|
||||
break;
|
||||
}
|
||||
|
||||
buffer.skipBytes(4);
|
||||
|
||||
process(ctx, channel, buffer, dataLen);
|
||||
}
|
||||
|
||||
if (cumulationBuffer) {
|
||||
if (!buffer.readable()) {
|
||||
this.cumulation = null;
|
||||
} else if (buffer.readerIndex() > 0) {
|
||||
// make a fresh copy of the cumalation buffer, so we
|
||||
// can readBytesReference from it, and also, don't keep it around
|
||||
|
||||
// its not that big of an overhead since discardReadBytes in the next round messageReceived will
|
||||
// copy over the bytes to the start again
|
||||
if (actualSize > 0) {
|
||||
this.cumulation = ChannelBuffers.dynamicBuffer(actualSize, ctx.getChannel().getConfig().getBufferFactory());
|
||||
} else {
|
||||
this.cumulation = ChannelBuffers.dynamicBuffer(ctx.getChannel().getConfig().getBufferFactory());
|
||||
}
|
||||
this.cumulation.writeBytes(buffer);
|
||||
}
|
||||
}
|
||||
|
||||
return actualSize;
|
||||
}
|
||||
|
||||
|
||||
private void cleanup(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
|
||||
try {
|
||||
ChannelBuffer cumulation = this.cumulation;
|
||||
if (cumulation == null) {
|
||||
return;
|
||||
} else {
|
||||
this.cumulation = null;
|
||||
}
|
||||
|
||||
if (cumulation.readable()) {
|
||||
// Make sure all frames are read before notifying a closed channel.
|
||||
callDecode(ctx, ctx.getChannel(), cumulation, true);
|
||||
}
|
||||
|
||||
// Call decodeLast() finally. Please note that decodeLast() is
|
||||
// called even if there's nothing more to read from the buffer to
|
||||
// notify a user that the connection was closed explicitly.
|
||||
|
||||
// Change from FrameDecoder: we don't need it...
|
||||
// Object partialFrame = decodeLast(ctx, ctx.getChannel(), cumulation);
|
||||
// if (partialFrame != null) {
|
||||
// unfoldAndFireMessageReceived(ctx, null, partialFrame);
|
||||
// }
|
||||
} finally {
|
||||
ctx.sendUpstream(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void process(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer, int size) throws Exception {
|
||||
transportServiceAdapter.received(size + 4);
|
||||
|
||||
int markedReaderIndex = buffer.readerIndex();
|
||||
@ -240,7 +108,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
}
|
||||
|
||||
if (isRequest) {
|
||||
String action = handleRequest(channel, wrappedStream, requestId);
|
||||
String action = handleRequest(ctx.getChannel(), wrappedStream, requestId);
|
||||
if (buffer.readerIndex() != expectedIndexReader) {
|
||||
if (buffer.readerIndex() < expectedIndexReader) {
|
||||
logger.warn("Message not fully read (request) for [{}] and action [{}], resetting", requestId, action);
|
||||
|
@ -109,13 +109,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
final Boolean reuseAddress;
|
||||
|
||||
final ByteSizeValue tcpSendBufferSize;
|
||||
|
||||
final ByteSizeValue tcpReceiveBufferSize;
|
||||
|
||||
final int connectionsPerNodeLow;
|
||||
final int connectionsPerNodeMed;
|
||||
final int connectionsPerNodeHigh;
|
||||
|
||||
final ByteSizeValue maxCumulationBufferCapacity;
|
||||
final int maxCompositeBufferComponents;
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private volatile OpenChannelsHandler serverOpenChannels;
|
||||
@ -172,6 +174,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6));
|
||||
this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1));
|
||||
|
||||
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
|
||||
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
|
||||
|
||||
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}]",
|
||||
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh);
|
||||
}
|
||||
@ -207,6 +212,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
|
||||
if (maxCumulationBufferCapacity != null) {
|
||||
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
|
||||
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
|
||||
} else {
|
||||
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
|
||||
}
|
||||
}
|
||||
if (maxCompositeBufferComponents != -1) {
|
||||
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
|
||||
}
|
||||
pipeline.addLast("size", sizeHeader);
|
||||
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
|
||||
return pipeline;
|
||||
}
|
||||
@ -219,10 +236,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
if (tcpKeepAlive != null) {
|
||||
clientBootstrap.setOption("keepAlive", tcpKeepAlive);
|
||||
}
|
||||
if (tcpSendBufferSize != null) {
|
||||
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
|
||||
clientBootstrap.setOption("sendBufferSize", tcpSendBufferSize.bytes());
|
||||
}
|
||||
if (tcpReceiveBufferSize != null) {
|
||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||
clientBootstrap.setOption("receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||
}
|
||||
if (reuseAddress != null) {
|
||||
@ -250,6 +267,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = Channels.pipeline();
|
||||
pipeline.addLast("openChannels", serverOpenChannels);
|
||||
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
|
||||
if (maxCumulationBufferCapacity != null) {
|
||||
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
|
||||
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
|
||||
} else {
|
||||
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
|
||||
}
|
||||
}
|
||||
if (maxCompositeBufferComponents != -1) {
|
||||
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
|
||||
}
|
||||
pipeline.addLast("size", sizeHeader);
|
||||
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
|
||||
return pipeline;
|
||||
}
|
||||
@ -261,10 +290,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
if (tcpKeepAlive != null) {
|
||||
serverBootstrap.setOption("child.keepAlive", tcpKeepAlive);
|
||||
}
|
||||
if (tcpSendBufferSize != null) {
|
||||
if (tcpSendBufferSize != null && tcpSendBufferSize.bytes() > 0) {
|
||||
serverBootstrap.setOption("child.sendBufferSize", tcpSendBufferSize.bytes());
|
||||
}
|
||||
if (tcpReceiveBufferSize != null) {
|
||||
if (tcpReceiveBufferSize != null && tcpReceiveBufferSize.bytes() > 0) {
|
||||
serverBootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize.bytes());
|
||||
}
|
||||
if (reuseAddress != null) {
|
||||
|
@ -0,0 +1,41 @@
|
||||
package org.elasticsearch.transport.netty;
|
||||
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.jboss.netty.buffer.ChannelBuffer;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.handler.codec.frame.FrameDecoder;
|
||||
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
|
||||
|
||||
import java.io.StreamCorruptedException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().mem().heapMax().bytes() * 0.9);
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
if (buffer.readableBytes() < 4) {
|
||||
return null;
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex());
|
||||
if (dataLen <= 0) {
|
||||
throw new StreamCorruptedException("invalid data length: " + dataLen);
|
||||
}
|
||||
// safety against too large frames being sent
|
||||
if (dataLen > NINETY_PER_HEAP_SIZE) {
|
||||
throw new TooLongFrameException(
|
||||
"transport content length received [" + new ByteSizeValue(dataLen) + "] exceeded [" + new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + 4) {
|
||||
return null;
|
||||
}
|
||||
buffer.skipBytes(4);
|
||||
return buffer;
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user