ARTEMIS-1095 Netty's WriteBufferWaterMark configuration via TransportConstants

This commit is contained in:
Francesco Nigro 2017-04-06 11:33:29 +02:00
parent 2edc972c52
commit f53449b945
4 changed files with 45 additions and 2 deletions

View File

@ -58,6 +58,7 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
@ -211,6 +212,10 @@ public class NettyConnector extends AbstractConnector {
private int tcpReceiveBufferSize;
private final int writeBufferLowWaterMark;
private final int writeBufferHighWaterMark;
private long batchDelay;
private ConcurrentMap<Object, Connection> connections = new ConcurrentHashMap<>();
@ -336,7 +341,8 @@ public class NettyConnector extends AbstractConnector {
tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration);
this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration);
batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration);
connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT, TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, configuration);
@ -423,6 +429,10 @@ public class NettyConnector extends AbstractConnector {
if (tcpSendBufferSize != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
}
final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low();
final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high();
final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark);
bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE);

View File

@ -125,6 +125,10 @@ public class TransportConstants {
*/
public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads";
public static final String WRITE_BUFFER_LOW_WATER_MARK_PROPNAME = "writeBufferLowWaterMark";
public static final String WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME = "writeBufferHighWaterMark";
public static final String REMOTING_THREADS_PROPNAME = "RemotingThreads";
public static final String BATCH_DELAY = "batchDelay";
@ -183,6 +187,10 @@ public class TransportConstants {
public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 1024 * 1024;
public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 32 * 1024;
public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 128 * 1024;
public static final boolean DEFAULT_HTTP_ENABLED = false;
public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500;
@ -253,6 +261,8 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY);
@ -303,6 +313,8 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME);
allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME);
allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME);
allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME);
allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME);
allowableConnectorKeys.add(TransportConstants.BATCH_DELAY);

View File

@ -44,6 +44,7 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
@ -161,6 +162,10 @@ public class NettyAcceptor extends AbstractAcceptor {
private final int tcpReceiveBufferSize;
private final int writeBufferLowWaterMark;
private final int writeBufferHighWaterMark;
private int remotingThreads;
private final ConcurrentMap<Object, NettyServerConnection> connections = new ConcurrentHashMap<>();
@ -260,7 +265,8 @@ public class NettyAcceptor extends AbstractAcceptor {
tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration);
tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration);
tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration);
this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration);
this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration);
this.scheduledThreadPool = scheduledThreadPool;
batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration);
@ -341,6 +347,10 @@ public class NettyAcceptor extends AbstractAcceptor {
if (tcpSendBufferSize != -1) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, tcpSendBufferSize);
}
final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low();
final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high();
final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark);
bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark);
if (backlog != -1) {
bootstrap.option(ChannelOption.SO_BACKLOG, backlog);
}

View File

@ -239,6 +239,17 @@ Netty for simple TCP:
- `tcpReceiveBufferSize`. This parameter determines the size of the
TCP receive buffer in bytes. The default value for this property is
`32768` bytes (32KiB).
- `writeBufferLowWaterMark`. This parameter determines the low water mark of
the Netty write buffer. Once the number of bytes queued in the write buffer exceeded
the high water mark and then dropped down below this value, Netty's channel
will start to be writable again. The default value for this property is
`32768` bytes (32KiB).
- `writeBufferHighWaterMark`. This parameter determines the high water mark of
the Netty write buffer. If the number of bytes queued in the write buffer exceeds
this value, Netty's channel will start to be not writable. The default value for
this property is `131072` bytes (128KiB).
- `batchDelay`. Before writing packets to the transport, Apache ActiveMQ Artemis can
be configured to batch up writes for a maximum of `batchDelay`