ARTEMIS-3163 Experimental support for Netty IO_URING incubator

This commit is contained in:
AntonRoskvist 2024-10-11 20:02:25 +02:00
parent eabeae0391
commit a6c5ea78e5
12 changed files with 121 additions and 7 deletions

View File

@ -70,6 +70,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
*
</Import-Package>

View File

@ -89,6 +89,15 @@
<groupId>io.netty</groupId>
<artifactId>netty-transport-classes-kqueue</artifactId>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>

View File

@ -353,4 +353,11 @@ public interface ActiveMQClientLogger {
@LogMessage(id = 214036, value = "Connection closure to {} has been detected: {} [code={}]", level = LogMessage.Level.INFO)
void connectionClosureDetected(String remoteAddress, String message, ActiveMQExceptionType type);
@LogMessage(id = 214037, value = "Unable to check IoUring availability ", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailability(Throwable e);
@LogMessage(id = 214038, value = "IoUring is not available, please add to the classpath or configure useIoUring=false to remove this warning", level = LogMessage.Level.WARN)
void unableToCheckIoUringAvailabilitynoClass();
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.kqueue.KQueue;
import io.netty.incubator.channel.uring.IOUring;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.utils.Env;
import org.slf4j.Logger;
@ -45,6 +46,18 @@ public class CheckDependencies {
}
}
public static final boolean isIoUringAvailable() {
try {
return Env.isLinuxOs() && IOUring.isAvailable();
} catch (NoClassDefFoundError noClassDefFoundError) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailabilitynoClass();
return false;
} catch (Throwable e) {
ActiveMQClientLogger.LOGGER.unableToCheckIoUringAvailability(e);
return false;
}
}
public static final boolean isKQueueAvailable() {
try {
return Env.isMacOs() && KQueue.isAvailable();

View File

@ -98,6 +98,8 @@ import io.netty.handler.proxy.ProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.resolver.NoopAddressResolverGroup;
import io.netty.util.AttributeKey;
import io.netty.util.ResourceLeakDetector;
@ -137,6 +139,7 @@ public class NettyConnector extends AbstractConnector {
public static String NIO_CONNECTOR_TYPE = "NIO";
public static String EPOLL_CONNECTOR_TYPE = "EPOLL";
public static String KQUEUE_CONNECTOR_TYPE = "KQUEUE";
public static String IOURING_CONNECTOR_TYPE = "IO_URING";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -295,6 +298,8 @@ public class NettyConnector extends AbstractConnector {
private boolean useKQueue;
private boolean useIoUring;
private int remotingThreads;
private boolean useGlobalWorkerPool;
@ -404,6 +409,7 @@ public class NettyConnector extends AbstractConnector {
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
useServlet = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_SERVLET_PROP_NAME, TransportConstants.DEFAULT_USE_SERVLET, configuration);
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
@ -528,6 +534,8 @@ public class NettyConnector extends AbstractConnector {
return;
}
boolean defaultRemotingThreads = remotingThreads == -1;
if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
@ -535,14 +543,30 @@ public class NettyConnector extends AbstractConnector {
String connectorType;
if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new IOUringEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new IOUringEventLoopGroup(remotingThreads);
}
connectorType = IOURING_CONNECTOR_TYPE;
channelClazz = IOUringSocketChannel.class;
logger.debug("Connector {} using native io_uring", this);
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useGlobalWorkerPool) {
group = SharedEventLoopGroup.getInstance((threadFactory -> new EpollEventLoopGroup(remotingThreads, threadFactory)));
} else {
group = new EpollEventLoopGroup(remotingThreads);
}
connectorType = EPOLL_CONNECTOR_TYPE;
channelClazz = EpollSocketChannel.class;
logger.debug("Connector {} using native epoll", this);
} else if (useKQueue && CheckDependencies.isKQueueAvailable()) {
if (useGlobalWorkerPool) {
@ -550,19 +574,21 @@ public class NettyConnector extends AbstractConnector {
} else {
group = new KQueueEventLoopGroup(remotingThreads);
}
connectorType = KQUEUE_CONNECTOR_TYPE;
channelClazz = KQueueSocketChannel.class;
logger.debug("Connector {} using native kqueue", this);
} else {
if (useGlobalWorkerPool) {
channelClazz = NioSocketChannel.class;
group = SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(remotingThreads, threadFactory)));
} else {
channelClazz = NioSocketChannel.class;
group = new NioEventLoopGroup(remotingThreads);
}
connectorType = NIO_CONNECTOR_TYPE;
channelClazz = NioSocketChannel.class;
logger.debug("Connector {} using nio", this);
}
// if we are a servlet wrap the socketChannelFactory

View File

@ -66,6 +66,8 @@ public class TransportConstants {
public static final String USE_EPOLL_PROP_NAME = "useEpoll";
public static final String USE_IOURING_PROP_NAME = "useIoUring";
public static final String USE_KQUEUE_PROP_NAME = "useKQueue";
@Deprecated
@ -213,6 +215,8 @@ public class TransportConstants {
public static final boolean DEFAULT_USE_KQUEUE = true;
public static final boolean DEFAULT_USE_IOURING = false;
public static final boolean DEFAULT_USE_INVM = false;
public static final boolean DEFAULT_USE_SERVLET = false;
@ -409,6 +413,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.USE_NIO_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableAcceptorKeys.add(TransportConstants.USE_INVM_PROP_NAME);
//noinspection deprecation
allowableAcceptorKeys.add(TransportConstants.PROTOCOL_PROP_NAME);
@ -484,6 +489,7 @@ public class TransportConstants {
allowableConnectorKeys.add(TransportConstants.USE_NIO_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_EPOLL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_KQUEUE_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_IOURING_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.USE_GLOBAL_WORKER_POOL_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HOST_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.PORT_PROP_NAME);

View File

@ -78,6 +78,7 @@
<Import-Package>
org.glassfish.json*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
*
</Import-Package>

View File

@ -440,6 +440,19 @@
<classifier>${netty-transport-native-kqueue-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-classes-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>io.netty.incubator</groupId>
<artifactId>netty-incubator-transport-native-io_uring</artifactId>
<version>${netty.incubator.io_uring.version}</version>
<classifier>${netty-transport-native-io_uring-classifier}</classifier>
<!-- License: Apache 2.0 -->
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>

View File

@ -128,6 +128,7 @@
org.glassfish.json*;resolution:=optional,
org.postgresql*;resolution:=optional,
de.dentrassi.crypto.pem;resolution:=optional,
io.netty.incubator.*;resolution:=optional,
io.netty.buffer;io.netty.*;version="[4.1,5)",
java.net.http*;resolution:=optional,
com.sun.net.httpserver*;resolution:=optional,

View File

@ -64,6 +64,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import io.netty.incubator.channel.uring.IOUringServerSocketChannel;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
@ -112,6 +114,7 @@ public class NettyAcceptor extends AbstractAcceptor {
public static final String NIO_ACCEPTOR_TYPE = "NIO";
public static final String EPOLL_ACCEPTOR_TYPE = "EPOLL";
public static final String KQUEUE_ACCEPTOR_TYPE = "KQUEUE";
public static final String IOURING_ACCEPTOR_TYPE = "EXPERIMENTAL_IO_URING";
static {
// Disable default Netty leak detection if the Netty leak detection level system properties are not in use
@ -148,6 +151,8 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean useKQueue;
private final boolean useIoUring;
private final ProtocolHandler protocolHandler;
private final String host;
@ -276,6 +281,7 @@ public class NettyAcceptor extends AbstractAcceptor {
useEpoll = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_EPOLL_PROP_NAME, TransportConstants.DEFAULT_USE_EPOLL, configuration);
useKQueue = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_KQUEUE_PROP_NAME, TransportConstants.DEFAULT_USE_KQUEUE, configuration);
useIoUring = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_IOURING_PROP_NAME, TransportConstants.DEFAULT_USE_IOURING, configuration);
backlog = ConfigurationHelper.getIntProperty(TransportConstants.BACKLOG_PROP_NAME, -1, configuration);
useInvm = ConfigurationHelper.getBooleanProperty(TransportConstants.USE_INVM_PROP_NAME, TransportConstants.DEFAULT_USE_INVM, configuration);
@ -425,12 +431,23 @@ public class NettyAcceptor extends AbstractAcceptor {
eventLoopGroup = new DefaultEventLoopGroup();
} else {
boolean defaultRemotingThreads = remotingThreads == -1;
if (remotingThreads == -1) {
// Default to number of cores * 3
remotingThreads = Runtime.getRuntime().availableProcessors() * 3;
}
if (useEpoll && CheckDependencies.isEpollAvailable()) {
if (useIoUring && CheckDependencies.isIoUringAvailable()) {
//IO_URING should default to 1 remotingThread unless specified in config
remotingThreads = defaultRemotingThreads ? 1 : remotingThreads;
channelClazz = IOUringServerSocketChannel.class;
eventLoopGroup = new IOUringEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = IOURING_ACCEPTOR_TYPE;
logger.debug("Acceptor using native io_uring");
} else if (useEpoll && CheckDependencies.isEpollAvailable()) {
channelClazz = EpollServerSocketChannel.class;
eventLoopGroup = new EpollEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = EPOLL_ACCEPTOR_TYPE;
@ -446,6 +463,7 @@ public class NettyAcceptor extends AbstractAcceptor {
channelClazz = NioServerSocketChannel.class;
eventLoopGroup = new NioEventLoopGroup(remotingThreads, AccessController.doPrivileged((PrivilegedAction<ActiveMQThreadFactory>) () -> new ActiveMQThreadFactory("activemq-netty-threads", true, ClientSessionFactoryImpl.class.getClassLoader())));
acceptorType = NIO_ACCEPTOR_TYPE;
logger.debug("Acceptor using nio");
}
}

View File

@ -243,14 +243,14 @@ These Native transports add features specific to a particular platform, generate
Both Clients and Server can benefit from this.
Current Supported Platforms.
Currently supported platforms:
* Linux running 64bit JVM
* MacOS running 64bit JVM
Apache ActiveMQ Artemis will by default enable the corresponding native transport if a supported platform is detected.
Apache ActiveMQ Artemis will enable the corresponding native transport by default if a supported platform is detected.
If running on an unsupported platform or any issues loading native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
If running on an unsupported platform, or if any issues occur while loading the native libs, Apache ActiveMQ Artemis will fallback onto Java NIO.
==== Linux Native Transport
@ -263,6 +263,23 @@ enables the use of epoll if a supported linux platform is running a 64bit JVM is
Setting this to `false` will force the use of Java NIO instead of epoll.
Default is `true`
Additionally, Apache ActiveMQ Artemis offers `experimental` support for using IO_URING, @see https://en.wikipedia.org/wiki/Io_uring.
The following properties are specific to this native transport:
useIoUring::
enables the use of IO_URING if a supported linux platform running a 64bit JVM is detected.
Setting this to `false` will attempt the use of `epoll`, then finally falling back to using Java NIO.
Default is `false`
[WARNING]
====
[#io_uring-warning]
IO_URING support is `experimental` at this point. Using it _could_ introduce unwanted side effects or unpredicted behavior.
It's currently not recommended for production or any otherwise critical use.
====
==== MacOS Native Transport
On supported MacOS platforms KQueue is used, @see https://en.wikipedia.org/wiki/Kqueue.

View File

@ -121,6 +121,7 @@
<mockito.version>5.14.1</mockito.version>
<jctools.version>4.0.5</jctools.version>
<netty.version>4.1.114.Final</netty.version>
<netty.incubator.io_uring.version>0.0.25.Final</netty.incubator.io_uring.version>
<hdrhistogram.version>2.2.2</hdrhistogram.version>
<curator.version>5.7.0</curator.version>
<zookeeper.version>3.9.2</zookeeper.version>
@ -261,6 +262,7 @@
<netty-transport-native-epoll-classifier>linux-x86_64</netty-transport-native-epoll-classifier>
<netty-transport-native-kqueue-classifier>osx-x86_64</netty-transport-native-kqueue-classifier>
<netty-transport-native-io_uring-classifier>linux-x86_64</netty-transport-native-io_uring-classifier>
<fast-tests>false</fast-tests>