From f53449b945d2c7ddbd185063cd1ece051ed93990 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 6 Apr 2017 11:33:29 +0200 Subject: [PATCH] ARTEMIS-1095 Netty's WriteBufferWaterMark configuration via TransportConstants --- .../core/remoting/impl/netty/NettyConnector.java | 12 +++++++++++- .../core/remoting/impl/netty/TransportConstants.java | 12 ++++++++++++ .../core/remoting/impl/netty/NettyAcceptor.java | 12 +++++++++++- docs/user-manual/en/configuring-transports.md | 11 +++++++++++ 4 files changed, 45 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index f6935be827..d31bdb2ef0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -58,6 +58,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollSocketChannel; @@ -211,6 +212,10 @@ public class NettyConnector extends AbstractConnector { private int tcpReceiveBufferSize; + private final int writeBufferLowWaterMark; + + private final int writeBufferHighWaterMark; + private long batchDelay; private ConcurrentMap connections = new ConcurrentHashMap<>(); @@ -336,7 +341,8 @@ public class NettyConnector extends AbstractConnector { tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration); tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration); tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration); - + this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration); + this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration); batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration); connectTimeoutMillis = ConfigurationHelper.getIntProperty(TransportConstants.NETTY_CONNECT_TIMEOUT, TransportConstants.DEFAULT_NETTY_CONNECT_TIMEOUT, configuration); @@ -423,6 +429,10 @@ public class NettyConnector extends AbstractConnector { if (tcpSendBufferSize != -1) { bootstrap.option(ChannelOption.SO_SNDBUF, tcpSendBufferSize); } + final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low(); + final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high(); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark); + bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 4317a68751..69eaa94c1b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -125,6 +125,10 @@ public class TransportConstants { */ public static final String NIO_REMOTING_THREADS_PROPNAME = "nioRemotingThreads"; + public static final String WRITE_BUFFER_LOW_WATER_MARK_PROPNAME = "writeBufferLowWaterMark"; + + public static final String WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME = "writeBufferHighWaterMark"; + public static final String REMOTING_THREADS_PROPNAME = "RemotingThreads"; public static final String BATCH_DELAY = "batchDelay"; @@ -183,6 +187,10 @@ public class TransportConstants { public static final int DEFAULT_TCP_RECEIVEBUFFER_SIZE = 1024 * 1024; + public static final int DEFAULT_WRITE_BUFFER_LOW_WATER_MARK = 32 * 1024; + + public static final int DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK = 128 * 1024; + public static final boolean DEFAULT_HTTP_ENABLED = false; public static final long DEFAULT_HTTP_CLIENT_IDLE_TIME = 500; @@ -253,6 +261,8 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableAcceptorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME); + allowableAcceptorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME); allowableAcceptorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME); allowableAcceptorKeys.add(TransportConstants.BATCH_DELAY); @@ -303,6 +313,8 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.TCP_NODELAY_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME); allowableConnectorKeys.add(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME); + allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME); + allowableConnectorKeys.add(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME); allowableConnectorKeys.add(TransportConstants.NIO_REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.REMOTING_THREADS_PROPNAME); allowableConnectorKeys.add(TransportConstants.BATCH_DELAY); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java index 7b17694b8e..4f248f538a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyAcceptor.java @@ -44,6 +44,7 @@ import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; @@ -161,6 +162,10 @@ public class NettyAcceptor extends AbstractAcceptor { private final int tcpReceiveBufferSize; + private final int writeBufferLowWaterMark; + + private final int writeBufferHighWaterMark; + private int remotingThreads; private final ConcurrentMap connections = new ConcurrentHashMap<>(); @@ -260,7 +265,8 @@ public class NettyAcceptor extends AbstractAcceptor { tcpNoDelay = ConfigurationHelper.getBooleanProperty(TransportConstants.TCP_NODELAY_PROPNAME, TransportConstants.DEFAULT_TCP_NODELAY, configuration); tcpSendBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_SENDBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_SENDBUFFER_SIZE, configuration); tcpReceiveBufferSize = ConfigurationHelper.getIntProperty(TransportConstants.TCP_RECEIVEBUFFER_SIZE_PROPNAME, TransportConstants.DEFAULT_TCP_RECEIVEBUFFER_SIZE, configuration); - + this.writeBufferLowWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_LOW_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_LOW_WATER_MARK, configuration); + this.writeBufferHighWaterMark = ConfigurationHelper.getIntProperty(TransportConstants.WRITE_BUFFER_HIGH_WATER_MARK_PROPNAME, TransportConstants.DEFAULT_WRITE_BUFFER_HIGH_WATER_MARK, configuration); this.scheduledThreadPool = scheduledThreadPool; batchDelay = ConfigurationHelper.getLongProperty(TransportConstants.BATCH_DELAY, TransportConstants.DEFAULT_BATCH_DELAY, configuration); @@ -341,6 +347,10 @@ public class NettyAcceptor extends AbstractAcceptor { if (tcpSendBufferSize != -1) { bootstrap.childOption(ChannelOption.SO_SNDBUF, tcpSendBufferSize); } + final int writeBufferLowWaterMark = this.writeBufferLowWaterMark != -1 ? this.writeBufferLowWaterMark : WriteBufferWaterMark.DEFAULT.low(); + final int writeBufferHighWaterMark = this.writeBufferHighWaterMark != -1 ? this.writeBufferHighWaterMark : WriteBufferWaterMark.DEFAULT.high(); + final WriteBufferWaterMark writeBufferWaterMark = new WriteBufferWaterMark(writeBufferLowWaterMark, writeBufferHighWaterMark); + bootstrap.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, writeBufferWaterMark); if (backlog != -1) { bootstrap.option(ChannelOption.SO_BACKLOG, backlog); } diff --git a/docs/user-manual/en/configuring-transports.md b/docs/user-manual/en/configuring-transports.md index eec4e0906c..d3adfefd09 100644 --- a/docs/user-manual/en/configuring-transports.md +++ b/docs/user-manual/en/configuring-transports.md @@ -239,6 +239,17 @@ Netty for simple TCP: - `tcpReceiveBufferSize`. This parameter determines the size of the TCP receive buffer in bytes. The default value for this property is `32768` bytes (32KiB). + +- `writeBufferLowWaterMark`. This parameter determines the low water mark of + the Netty write buffer. Once the number of bytes queued in the write buffer exceeded + the high water mark and then dropped down below this value, Netty's channel + will start to be writable again. The default value for this property is + `32768` bytes (32KiB). + +- `writeBufferHighWaterMark`. This parameter determines the high water mark of + the Netty write buffer. If the number of bytes queued in the write buffer exceeds + this value, Netty's channel will start to be not writable. The default value for + this property is `131072` bytes (128KiB). - `batchDelay`. Before writing packets to the transport, Apache ActiveMQ Artemis can be configured to batch up writes for a maximum of `batchDelay`